import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; //Driver public class SparkWordCount1{ public static void main(String[] args) { SparkConf sparkConf = new SparkConf(). setMaster("local"). setAppName("wordcount"); sparkConf.set("spark.default.parallelism", "4"); //设置分区数 默认分区器是Hash分区器 JavaSparkContext ctx = new JavaSparkContext(sparkConf); ctx.setLogLevel("ERROR"); //final JavaRDD linesRdd = ctx.textFile(args[0]); ArrayList lines = new ArrayList(); lines.add("A"); lines.add("D"); lines.add("C"); lines.add("F"); lines.add("E"); // lines.add("Hello Java Hi Ok"); // lines.add("Ok No House Hello"); // lines.add("Yes I Like Java"); // lines.add("No I Dislike Java"); JavaRDD linesRdd = ctx.parallelize(lines,4); //干预分区数 System.out.println("linesRdd part num:"+ linesRdd.getNumPartitions()); System.out.println("linesRdd partitioner:"+ linesRdd.partitioner()); System.out.println("linesRdd:" +linesRdd.glom().collect()); JavaRDD words = linesRdd.flatMap( (s) -> Arrays.asList(s.split(" ")).iterator()); System.out.println("words part num:" + words.getNumPartitions()); System.out.println("words partitioner:" + words.partitioner()); System.out.println("words:" + words.glom().collect()); JavaPairRDD ones = words.mapToPair( s->new Tuple2(s, 1)); //ones.repartition(5); //第一种人为干预分区数 , 优先级高于前面的 //第二种人为干预分区数 , 优先级高于前面的 ones = ones.partitionBy(new Partitioner() { @Override public int numPartitions() { return 3; } @Override public int getPartition(Object key) { // TODO Auto-generated method stub int hc = key.hashCode(); int index = hc % numPartitions(); return index; } }); System.out.println("ones part num:"+ ones.getNumPartitions()); System.out.println("ones partitioner:"+ ones.partitioner()); System.out.println("ones:" +ones.glom().collect()); JavaPairRDD counts = ones.reduceByKey((x,y)->x+y); System.out.println("counts part num:" + counts.getNumPartitions()); System.out.println("counts partitioner:" + counts.partitioner()); System.out.println("counts:" + counts.glom().collect()); //List> results = counts.collect(); //System.out.println(results.toString()); //Scanner sc =new Scanner(System.in); //sc.next(); //counts.foreach(System.out::println); //cs.close(); ctx.close(); } }