Can you try commenting the saveAsTextFile and do a simple count()? If its a
broadcast issue, then it would throw up the same error.
On 21 May 2015 14:21, "allanjie" <allanmcgr...@gmail.com> wrote:

> Sure, the code is very simple. I think u guys can understand from the main
> function.
>
> public class Test1 {
>
>         public static double[][] createBroadcastPoints(String
> localPointPath, int
> row, int col) throws IOException{
>                 BufferedReader br = RAWF.reader(localPointPath);
>                 String line = null;
>                 int rowIndex = 0;
>                 double[][] pointFeatures = new double[row][col];
>                 while((line = br.readLine())!=null){
>                         List<String> point =
> Arrays.asList(line.split(","));
>                         int colIndex = 0;
>                         for(String pointFeature: point){
>                                 pointFeatures[rowIndex][colIndex] =
> Double.valueOf(pointFeature);
>                                 colIndex++;
>                         }
>                         rowIndex++;
>                 }
>                 br.close();
>                 return pointFeatures;
>         }
>
>
>
>         public static void main(String[] args) throws IOException{
>                 /**Parameter Setting***********/
>                  String localPointPath =
> "/home/hduser/skyrock/skyrockImageFeatures.csv";
>                  String remoteFilePath =
> "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
>                 //this csv file is only 468MB
>                  final int row = 133433;
>                  final int col = 458;
>                  /******************/
>
>                 SparkConf conf = new SparkConf().
>                                 setAppName("distance").
>                                 setMaster("spark://HadoopV26Master:7077").
>                                 set("spark.executor.memory", "4g").
>                                 set("spark.eventLog.enabled", "true")
>                                 .set("spark.eventLog.dir",
> "/usr/local/spark/logs/spark-events")
>                                 .set("spark.local.dir", "/tmp/spark-temp");
>                 JavaSparkContext sc = new JavaSparkContext(conf);
>
>                 JavaRDD<String> textFile = sc.textFile(remoteFilePath);
>                 //Broadcast variable
>                 //double[][] xx =;
>
>                 final Broadcast<double[][]> broadcastPoints =
> sc.broadcast(createBroadcastPoints(localPointPath,row,col));
>                 //final Broadcast<double[][]> broadcastPoints =
> sc.broadcast(xx);
>
>                 /**
>                  * Compute the distance in terms of each point on each
> instance.
>                  * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
>                  */
>                 JavaPairRDD<Pair,Double> distance =
> textFile.flatMapToPair(new
> PairFlatMapFunction<String, Pair, Double>(){
>                         public Iterable<Tuple2&lt;Pair, Double>>
> call(String v1) throws
> Exception{
>                                 List<String> al =
> Arrays.asList(v1.split(","));
>                                 double[] featureVals = new
> double[al.size()];
>                                 for(int j=0;j<al.size()-1;j++)
>                                         featureVals[j] =
> Double.valueOf(al.get(j+1));
>                                 int jIndex = Integer.valueOf(al.get(0));
>                                 double[][] allPoints =
> broadcastPoints.getValue();
>                                 double sum = 0;
>                                 List&lt;Tuple2&lt;Pair, Double>> list =
> new ArrayList<Tuple2&lt;Pair,
> Double>>();
>                                 for(int i=0;i<row; i++){
>                                         sum = 0;
>                                         for(int j=0;j&lt;al.size()-1;j++){
>                                                 sum +=
> (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
>                                         }
>                                         list.add(new
> Tuple2&lt;Pair,Double>(new
> Pair(i,jIndex),Math.sqrt(sum)));
>                                 }
>                                 return list;
>                         }
>                 });
>
>
>
> distance.saveAsTextFile("hdfs://HadoopV26Master:9000/user/"+args[0]);
>         }
> }
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to