Sure, the code is very simple. I think u guys can understand from the main
function.

public class Test1 {

        public static double[][] createBroadcastPoints(String localPointPath, 
int
row, int col) throws IOException{
                BufferedReader br = RAWF.reader(localPointPath);
                String line = null;
                int rowIndex = 0;
                double[][] pointFeatures = new double[row][col];
                while((line = br.readLine())!=null){
                        List<String> point = Arrays.asList(line.split(","));
                        int colIndex = 0;
                        for(String pointFeature: point){
                                pointFeatures[rowIndex][colIndex] = 
Double.valueOf(pointFeature);
                                colIndex++;
                        }
                        rowIndex++;
                }
                br.close();
                return pointFeatures;
        }
        
        
        
        public static void main(String[] args) throws IOException{
                /**Parameter Setting***********/
                 String localPointPath = 
"/home/hduser/skyrock/skyrockImageFeatures.csv";
                 String remoteFilePath =
"hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
                //this csv file is only 468MB
                 final int row = 133433;
                 final int col = 458;
                 /******************/
                
                SparkConf conf = new SparkConf().
                                setAppName("distance").
                                setMaster("spark://HadoopV26Master:7077").
                                set("spark.executor.memory", "4g").
                                set("spark.eventLog.enabled", "true")
                                .set("spark.eventLog.dir", 
"/usr/local/spark/logs/spark-events")
                                .set("spark.local.dir", "/tmp/spark-temp");
                JavaSparkContext sc = new JavaSparkContext(conf);
                
                JavaRDD<String> textFile = sc.textFile(remoteFilePath);
                //Broadcast variable
                //double[][] xx =;
                
                final Broadcast<double[][]> broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
                //final Broadcast<double[][]> broadcastPoints = 
sc.broadcast(xx);
                
                /**
                 * Compute the distance in terms of each point on each instance.
                 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
                 */
                JavaPairRDD<Pair,Double> distance = textFile.flatMapToPair(new
PairFlatMapFunction<String, Pair, Double>(){
                        public Iterable<Tuple2&lt;Pair, Double>> call(String 
v1) throws
Exception{
                                List<String> al = Arrays.asList(v1.split(",")); 
                                double[] featureVals = new double[al.size()];
                                for(int j=0;j<al.size()-1;j++)
                                        featureVals[j] = 
Double.valueOf(al.get(j+1));
                                int jIndex = Integer.valueOf(al.get(0));
                                double[][] allPoints =  
broadcastPoints.getValue();
                                double sum = 0;
                                List&lt;Tuple2&lt;Pair, Double>> list = new 
ArrayList<Tuple2&lt;Pair,
Double>>();
                                for(int i=0;i<row; i++){
                                        sum = 0;
                                        for(int j=0;j&lt;al.size()-1;j++){
                                                sum +=
(allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
                                        }
                                        list.add(new Tuple2&lt;Pair,Double>(new
Pair(i,jIndex),Math.sqrt(sum)));
                                }
                                return list;
                        }
                });
                
        
                
distance.saveAsTextFile("hdfs://HadoopV26Master:9000/user/"+args[0]);
        }
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to