[Java] spark分区器的使用,只有部分的说明,只做参考 →→→→→进入此内容的聊天室

来自 , 2019-05-16, 写在 Java, 查看 122 次.
URL http://www.code666.cn/view/84c6494d
  1. import org.apache.spark.Partitioner;
  2. import org.apache.spark.SparkConf;  
  3. import org.apache.spark.api.java.JavaPairRDD;  
  4. import org.apache.spark.api.java.JavaRDD;  
  5. import org.apache.spark.api.java.JavaSparkContext;  
  6. import org.apache.spark.api.java.function.FlatMapFunction;  
  7. import org.apache.spark.api.java.function.Function2;  
  8. import org.apache.spark.api.java.function.PairFunction;  
  9.  
  10. import scala.Tuple2;  
  11.  
  12. import java.util.ArrayList;
  13. import java.util.Arrays;  
  14. import java.util.Iterator;
  15. import java.util.List;
  16. import java.util.Scanner;
  17.  
  18. //Driver
  19. public class SparkWordCount1{
  20.     public static void main(String[] args) {
  21.         SparkConf sparkConf = new SparkConf().
  22.                                                           setMaster("local").
  23.                                                           setAppName("wordcount");  
  24.        
  25.         sparkConf.set("spark.default.parallelism", "4");   //设置分区数   默认分区器是Hash分区器
  26.         JavaSparkContext ctx = new JavaSparkContext(sparkConf);
  27.         ctx.setLogLevel("ERROR");
  28.        
  29.        
  30.         //final JavaRDD<String> linesRdd = ctx.textFile(args[0]);
  31.         ArrayList<String> lines = new ArrayList<String>();
  32.         lines.add("A");
  33.         lines.add("D");
  34.         lines.add("C");
  35.         lines.add("F");
  36.         lines.add("E");
  37.        // lines.add("Hello Java Hi Ok");
  38.        // lines.add("Ok No House Hello");
  39.        // lines.add("Yes I Like Java");
  40.        // lines.add("No I Dislike Java");
  41.        
  42.        
  43.         JavaRDD<String> linesRdd = ctx.parallelize(lines,4);  //干预分区数
  44.        
  45.        
  46.         System.out.println("linesRdd part num:"+ linesRdd.getNumPartitions());
  47.         System.out.println("linesRdd partitioner:"+ linesRdd.partitioner());
  48.         System.out.println("linesRdd:" +linesRdd.glom().collect());
  49.        
  50.        
  51.         JavaRDD<String> words = linesRdd.flatMap(
  52.                                                         (s) -> Arrays.asList(s.split(" ")).iterator());
  53.         System.out.println("words part num:" + words.getNumPartitions());
  54.         System.out.println("words partitioner:" + words.partitioner());
  55.         System.out.println("words:" + words.glom().collect());
  56.        
  57.         JavaPairRDD<String, Integer> ones = words.mapToPair(
  58.                                                                                 s->new Tuple2<String, Integer>(s, 1));
  59.        
  60.         //ones.repartition(5);  //第一种人为干预分区数    ,  优先级高于前面的
  61.        
  62.        
  63.        
  64.                                                         //第二种人为干预分区数    ,  优先级高于前面的
  65.         ones  = ones.partitionBy(new Partitioner() {
  66.                        
  67.                         @Override
  68.                         public int numPartitions() {
  69.                                 return 3;
  70.                         }
  71.                        
  72.                         @Override
  73.                         public int getPartition(Object key) {
  74.                                 // TODO Auto-generated method stub
  75.                                 int hc = key.hashCode();
  76.                                 int index = hc % numPartitions();
  77.                                
  78.                                 return index;
  79.                         }
  80.                 });
  81.        
  82.         System.out.println("ones part num:"+ ones.getNumPartitions());
  83.         System.out.println("ones partitioner:"+ ones.partitioner());
  84.         System.out.println("ones:" +ones.glom().collect());
  85.        
  86.        
  87.        
  88.        
  89.        
  90.         JavaPairRDD<String, Integer> counts =  ones.reduceByKey((x,y)->x+y);  
  91.         System.out.println("counts part num:" + counts.getNumPartitions());
  92.         System.out.println("counts partitioner:" + counts.partitioner());
  93.         System.out.println("counts:" + counts.glom().collect());
  94.        
  95.        
  96.         //List<Tuple2<String, Integer>> results = counts.collect();
  97.         //System.out.println(results.toString());
  98.        
  99.         //Scanner sc =new Scanner(System.in);
  100.         //sc.next();
  101.        
  102.        
  103.        
  104.         //counts.foreach(System.out::println);
  105.         //cs.close();
  106.         ctx.close();
  107.     }  
  108. }

回复 "spark分区器的使用,只有部分的说明,只做参考"

这儿你可以回复上面这条便签

captcha