Re: java program got Stuck at broadcasting
Sure, the code is very simple. I think u guys can understand from the main function. public class Test1 { public static double[][] createBroadcastPoints(String localPointPath, int row, int col) throws IOException{ BufferedReader br = RAWF.reader(localPointPath); String line = null; int rowIndex = 0; double[][] pointFeatures = new double[row][col]; while((line = br.readLine())!=null){ ListString point = Arrays.asList(line.split(,)); int colIndex = 0; for(String pointFeature: point){ pointFeatures[rowIndex][colIndex] = Double.valueOf(pointFeature); colIndex++; } rowIndex++; } br.close(); return pointFeatures; } public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; //this csv file is only 468MB final int row = 133433; final int col = 458; /**/ SparkConf conf = new SparkConf(). setAppName(distance). setMaster(spark://HadoopV26Master:7077). set(spark.executor.memory, 4g). set(spark.eventLog.enabled, true) .set(spark.eventLog.dir, /usr/local/spark/logs/spark-events) .set(spark.local.dir, /tmp/spark-temp); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable //double[][] xx =; final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); //final Broadcastdouble[][] broadcastPoints = sc.broadcast(xx); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDPair,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Pair, Double(){ public IterableTuple2lt;Pair, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.getValue(); double sum = 0; Listlt;Tuple2lt;Pair, Double list = new ArrayListTuple2lt;Pair, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jlt;al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2lt;Pair,Double(new Pair(i,jIndex),Math.sqrt(sum))); } return list; } }); distance.saveAsTextFile(hdfs://HadoopV26Master:9000/user/+args[0]); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java program got Stuck at broadcasting
Can you try commenting the saveAsTextFile and do a simple count()? If its a broadcast issue, then it would throw up the same error. On 21 May 2015 14:21, allanjie allanmcgr...@gmail.com wrote: Sure, the code is very simple. I think u guys can understand from the main function. public class Test1 { public static double[][] createBroadcastPoints(String localPointPath, int row, int col) throws IOException{ BufferedReader br = RAWF.reader(localPointPath); String line = null; int rowIndex = 0; double[][] pointFeatures = new double[row][col]; while((line = br.readLine())!=null){ ListString point = Arrays.asList(line.split(,)); int colIndex = 0; for(String pointFeature: point){ pointFeatures[rowIndex][colIndex] = Double.valueOf(pointFeature); colIndex++; } rowIndex++; } br.close(); return pointFeatures; } public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; //this csv file is only 468MB final int row = 133433; final int col = 458; /**/ SparkConf conf = new SparkConf(). setAppName(distance). setMaster(spark://HadoopV26Master:7077). set(spark.executor.memory, 4g). set(spark.eventLog.enabled, true) .set(spark.eventLog.dir, /usr/local/spark/logs/spark-events) .set(spark.local.dir, /tmp/spark-temp); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable //double[][] xx =; final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); //final Broadcastdouble[][] broadcastPoints = sc.broadcast(xx); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDPair,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Pair, Double(){ public IterableTuple2lt;Pair, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.getValue(); double sum = 0; Listlt;Tuple2lt;Pair, Double list = new ArrayListTuple2lt;Pair, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jlt;al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2lt;Pair,Double(new Pair(i,jIndex),Math.sqrt(sum))); } return list; } }); distance.saveAsTextFile(hdfs://HadoopV26Master:9000/user/+args[0]); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java program got Stuck at broadcasting
) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy13.addBlock(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399) at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy14.addBlock(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1532) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1349) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) And then I check the volume of each slave, seems that almost all the storage has been dominated. But the variable I broadcast is just 468MB. Originally it is saved in HDFS. And In java program I read it from hdfs and then broadcast that variable. Anyone can help? Really thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22956.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org