Sure, the code is very simple. I think u guys can understand from the main function.
public class Test1 { public static double[][] createBroadcastPoints(String localPointPath, int row, int col) throws IOException{ BufferedReader br = RAWF.reader(localPointPath); String line = null; int rowIndex = 0; double[][] pointFeatures = new double[row][col]; while((line = br.readLine())!=null){ List<String> point = Arrays.asList(line.split(",")); int colIndex = 0; for(String pointFeature: point){ pointFeatures[rowIndex][colIndex] = Double.valueOf(pointFeature); colIndex++; } rowIndex++; } br.close(); return pointFeatures; } public static void main(String[] args) throws IOException{ /**Parameter Setting***********/ String localPointPath = "/home/hduser/skyrock/skyrockImageFeatures.csv"; String remoteFilePath = "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv"; //this csv file is only 468MB final int row = 133433; final int col = 458; /******************/ SparkConf conf = new SparkConf(). setAppName("distance"). setMaster("spark://HadoopV26Master:7077"). set("spark.executor.memory", "4g"). set("spark.eventLog.enabled", "true") .set("spark.eventLog.dir", "/usr/local/spark/logs/spark-events") .set("spark.local.dir", "/tmp/spark-temp"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> textFile = sc.textFile(remoteFilePath); //Broadcast variable //double[][] xx =; final Broadcast<double[][]> broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); //final Broadcast<double[][]> broadcastPoints = sc.broadcast(xx); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDD<Pair,Double> distance = textFile.flatMapToPair(new PairFlatMapFunction<String, Pair, Double>(){ public Iterable<Tuple2<Pair, Double>> call(String v1) throws Exception{ List<String> al = Arrays.asList(v1.split(",")); double[] featureVals = new double[al.size()]; for(int j=0;j<al.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.getValue(); double sum = 0; List<Tuple2<Pair, Double>> list = new ArrayList<Tuple2<Pair, Double>>(); for(int i=0;i<row; i++){ sum = 0; for(int j=0;j<al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2<Pair,Double>(new Pair(i,jIndex),Math.sqrt(sum))); } return list; } }); distance.saveAsTextFile("hdfs://HadoopV26Master:9000/user/"+args[0]); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org