import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
//Driver
public class SparkWordCount1{
public static void main
(String[] args
) {
SparkConf sparkConf = new SparkConf().
setMaster("local").
setAppName("wordcount");
sparkConf.set("spark.default.parallelism", "4"); //设置分区数 默认分区器是Hash分区器
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
ctx.setLogLevel("ERROR");
//final JavaRDD<String> linesRdd = ctx.textFile(args[0]);
ArrayList<String> lines = new ArrayList<String>();
lines.add("A");
lines.add("D");
lines.add("C");
lines.add("F");
lines.add("E");
// lines.add("Hello Java Hi Ok");
// lines.add("Ok No House Hello");
// lines.add("Yes I Like Java");
// lines.add("No I Dislike Java");
JavaRDD<String> linesRdd = ctx.parallelize(lines,4); //干预分区数
System.
out.
println("linesRdd part num:"+ linesRdd.
getNumPartitions());
System.
out.
println("linesRdd partitioner:"+ linesRdd.
partitioner());
System.
out.
println("linesRdd:" +linesRdd.
glom().
collect());
JavaRDD<String> words = linesRdd.flatMap(
(s
) -> Arrays.
asList(s.
split(" ")).
iterator());
System.
out.
println("words part num:" + words.
getNumPartitions());
System.
out.
println("words partitioner:" + words.
partitioner());
System.
out.
println("words:" + words.
glom().
collect());
JavaPairRDD
<String, Integer
> ones
= words.
mapToPair(
s
->new Tuple2
<String, Integer
>(s,
1));
//ones.repartition(5); //第一种人为干预分区数 , 优先级高于前面的
//第二种人为干预分区数 , 优先级高于前面的
ones = ones.partitionBy(new Partitioner() {
@Override
public int numPartitions() {
return 3;
}
@Override
public int getPartition
(Object key
) {
// TODO Auto-generated method stub
int hc = key.hashCode();
int index = hc % numPartitions();
return index;
}
});
System.
out.
println("ones part num:"+ ones.
getNumPartitions());
System.
out.
println("ones partitioner:"+ ones.
partitioner());
System.
out.
println("ones:" +ones.
glom().
collect());
JavaPairRDD
<String, Integer
> counts
= ones.
reduceByKey((x,y
)->x
+y
);
System.
out.
println("counts part num:" + counts.
getNumPartitions());
System.
out.
println("counts partitioner:" + counts.
partitioner());
System.
out.
println("counts:" + counts.
glom().
collect());
//List<Tuple2<String, Integer>> results = counts.collect();
//System.out.println(results.toString());
//Scanner sc =new Scanner(System.in);
//sc.next();
//counts.foreach(System.out::println);
//cs.close();
ctx.close();
}
}