java program Get Stuck at broadcasting

2015-05-19 Thread allanjie
​Hi All,
The variable I need to broadcast is just 468 MB.
 
 
When broadcasting, it just “stop” at here:

*
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.tip.id is
deprecated. Instead, use mapreduce.task.id
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.id is
deprecated. Instead, use mapreduce.task.attempt.id
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.is.map is
deprecated. Instead, use mapreduce.task.ismap
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.partition is
deprecated. Instead, use mapreduce.task.partition
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.job.id is
deprecated. Instead, use mapreduce.job.id
15/05/20 11:36:14 INFO mapred.FileInputFormat: Total input paths to process
: 1
15/05/20 11:36:14 INFO spark.SparkContext: Starting job: saveAsTextFile at
Test1.java:90
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
Test1.java:90) with 4 output partitions (allowLocal=false)
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Final stage: Stage
0(saveAsTextFile at Test1.java:90)
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Parents of final stage:
List()
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Missing parents: List()
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90), which has no
missing parents
15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(129264) called
with curMem=988453294, maxMem=2061647216
15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 126.2 KB, free 1023.4 MB)
15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(78190) called
with curMem=988582558, maxMem=2061647216
15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
as bytes in memory (estimated size 76.4 KB, free 1023.3 MB)
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Master:44855 (size: 76.4 KB, free: 1492.4 MB)
15/05/20 11:36:15 INFO storage.BlockManagerMaster: Updated info of block
broadcast_2_piece0
15/05/20 11:36:15 INFO spark.SparkContext: Created broadcast 2 from
broadcast at DAGScheduler.scala:839
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
from Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90)
15/05/20 11:36:15 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
4 tasks
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, HadoopV26Slave5, NODE_LOCAL, 1387 bytes)
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 1, HadoopV26Slave3, NODE_LOCAL, 1387 bytes)
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
0.0 (TID 2, HadoopV26Slave4, NODE_LOCAL, 1387 bytes)
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
0.0 (TID 3, HadoopV26Slave1, NODE_LOCAL, 1387 bytes)
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave5:45357 (size: 76.4 KB, free: 2.1 GB)
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave3:57821 (size: 76.4 KB, free: 2.1 GB)  
…….
15/05/20 11:36:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece1 in
memory on HadoopV26Slave5:45357 (size: 4.0 MB, free: 1646.3 MB)
*

And didn’t go forward as I still waiting, basically not stop, but more like
stuck.

I have 6 workers/VMs: each of them has 8GB memory and 12GB disk storage. 
After a few mins pass, the program stopped and showed something like this: 


15/05/20 11:42:45 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
(TID 1, HadoopV26Slave3):
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/user/output/_temporary/0/_temporary/attempt_201505201136__m_01_1/part-1
could only be replicated to 0 nodes instead of minReplication (=1).  There
are 6 datanode(s) running and no node(s) are excluded in this operation.
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1549)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3200)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

java program got Stuck at broadcasting

2015-05-20 Thread allanjie
The variable I need to broadcast is just 468 MB. 
  
  
When broadcasting, it just “stop” at here: 


*15/05/20 11:36:14 INFO Configuration.deprecation: mapred.tip.id is
deprecated. Instead, use mapreduce.task.id 
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.id is
deprecated. Instead, use mapreduce.task.attempt.id 
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.is.map is
deprecated. Instead, use mapreduce.task.ismap 
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.partition is
deprecated. Instead, use mapreduce.task.partition 
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.job.id is
deprecated. Instead, use mapreduce.job.id 
15/05/20 11:36:14 INFO mapred.FileInputFormat: Total input paths to process
: 1 
15/05/20 11:36:14 INFO spark.SparkContext: Starting job: saveAsTextFile at
Test1.java:90 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
Test1.java:90) with 4 output partitions (allowLocal=false) 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Final stage: Stage
0(saveAsTextFile at Test1.java:90) 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Parents of final stage:
List() 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Missing parents: List() 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90), which has no
missing parents 
15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(129264) called
with curMem=988453294, maxMem=2061647216 
15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 126.2 KB, free 1023.4 MB) 
15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(78190) called
with curMem=988582558, maxMem=2061647216 
15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
as bytes in memory (estimated size 76.4 KB, free 1023.3 MB) 
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Master:44855 (size: 76.4 KB, free: 1492.4 MB) 
15/05/20 11:36:15 INFO storage.BlockManagerMaster: Updated info of block
broadcast_2_piece0 
15/05/20 11:36:15 INFO spark.SparkContext: Created broadcast 2 from
broadcast at DAGScheduler.scala:839 
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
from Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90) 
15/05/20 11:36:15 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
4 tasks 
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, HadoopV26Slave5, NODE_LOCAL, 1387 bytes) 
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 1, HadoopV26Slave3, NODE_LOCAL, 1387 bytes) 
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
0.0 (TID 2, HadoopV26Slave4, NODE_LOCAL, 1387 bytes) 
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
0.0 (TID 3, HadoopV26Slave1, NODE_LOCAL, 1387 bytes) 
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave5:45357 (size: 76.4 KB, free: 2.1 GB) 
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave3:57821 (size: 76.4 KB, free: 2.1 GB)   
……. 
15/05/20 11:36:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece1 in
memory on HadoopV26Slave5:45357 (size: 4.0 MB, free: 1646.3 MB) 
*

And didn’t go forward as I still waiting, basically not stop, but more like
stuck. 

I have 6 workers/VMs: each of them has 8GB memory and 12GB disk storage. 
After a few mins pass, the program stopped and showed something like this: 


15/05/20 11:42:45 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
(TID 1, HadoopV26Slave3):
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/user/output/_temporary/0/_temporary/attempt_201505201136__m_01_1/part-1
could only be replicated to 0 nodes instead of minReplication (=1).  There
are 6 datanode(s) running and no node(s) are excluded in this operation. 
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1549)
 
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3200)
 
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
 
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
 
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
 
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) 
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) 
at org.apache.hadoop.ip

Re: save column values of DataFrame to text file

2015-05-20 Thread allanjie
Sorry, bt how does that work?
Can u specify the detail about the problem?

On 20 May 2015 at 21:32, oubrik [via Apache Spark User List] <
ml-node+s1001560n2295...@n3.nabble.com> wrote:

> hi,
> try like thiis
>
> DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
> df.select("year", "model").save("newcars.csv",
> "com.databricks.spark.csv");
>
> for more information: https://github.com/databricks/spark-csv
>
> Regards
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/save-column-values-of-DataFrame-to-text-file-tp22718p22957.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>



-- 
PhD student,
Social Media Laboratory ,
Department of Electronic & Computer Engineering
,
The Hong Kong University of Science and Technology .
Website: http://www.allanjie.net




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-column-values-of-DataFrame-to-text-file-tp22718p22958.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: java program got Stuck at broadcasting

2015-05-21 Thread allanjie
Hi,

Just check the logs of datanode, it looks like this:
*
2015-05-20 11:42:14,605 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src:
/10.9.0.48:50676, dest: /10.9.0.17:50010, bytes: 134217728, op: HDFS_WRITE,
cliID: DFSClient_NONMAPREDUCE_804680172_54, offset: 0, srvID:
39fb78d5-828a-4319-8303-c704fab526e3, blockid:
BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273, duration:
16994466261
2015-05-20 11:42:14,606 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
BP-436159032-10.9.0.16-1431330007172:blk_1073742096_1273,
type=LAST_IN_PIPELINE, downstreams=0:[] terminating
2015-05-20 11:42:17,788 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src:
/10.9.0.17:49046, dest: /10.9.0.17:50010, bytes: 134217728, op: HDFS_WRITE,
cliID: DFSClient_NONMAPREDUCE_102926009_54, offset: 0, srvID:
39fb78d5-828a-4319-8303-c704fab526e3, blockid:
BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276, duration:
17829554438
2015-05-20 11:42:17,788 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
BP-436159032-10.9.0.16-1431330007172:blk_1073742099_1276,
type=HAS_DOWNSTREAM_IN_PIPELINE terminating
2015-05-20 11:42:17,904 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving
BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 src:
/10.9.0.17:49049 dest: /10.9.0.17:50010
2015-05-20 11:42:17,904 WARN
org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in
BlockReceiver constructor. Cause is 
2015-05-20 11:42:17,904 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode: opWriteBlock
BP-436159032-10.9.0.16-1431330007172:blk_1073742103_1280 received exception
org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space:
The volume with the most available space (=114409472 B) is less than the
block size (=134217728 B).
2015-05-20 11:42:17,905 ERROR
org.apache.hadoop.hdfs.server.datanode.DataNode:
HadoopV26Slave1:50010:DataXceiver error processing WRITE_BLOCK operation 
src: /10.9.0.17:49049 dst: /10.9.0.17:50010
org.apache.hadoop.util.DiskChecker$DiskOutOfSpaceException: Out of space:
The volume with the most available space (=114409472 B) is less than the
block size (=134217728 B).
at
org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy.chooseVolume(RoundRobinVolumeChoosingPolicy.java:67)
at
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList.getNextVolume(FsVolumeList.java:69)
at
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:1084)
at
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.createRbw(FsDatasetImpl.java:114)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.(BlockReceiver.java:183)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:615)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:137)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:74)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:235)
at java.lang.Thread.run(Thread.java:745)
2015-05-20 11:43:59,669 INFO
org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741999_1176
2015-05-20 11:46:10,214 INFO
org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073742000_1177
2015-05-20 11:48:35,445 INFO
org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner: Verification
succeeded for BP-436159032-10.9.0.16-1431330007172:blk_1073741990_1167
2015-05-20 11:50:04,043 INFO
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
Scheduling blk_1073742080_1257 file
/tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742080
for deletion
2015-05-20 11:50:04,136 INFO
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
Scheduling blk_1073742081_1258 file
/tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742081
for deletion
2015-05-20 11:50:04,136 INFO
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
Scheduling blk_1073742082_1259 file
/tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742082
for deletion
2015-05-20 11:50:04,136 INFO
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
Scheduling blk_1073742083_1260 file
/tmp/hadoop-hduser/dfs/data/current/BP-436159032-10.9.0.16-1431330007172/current/finalized/subdir0/subdir1/blk_1073742083
for deletion
2015-05-20 11:50:04,136 INFO
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
Scheduling blk_1073742084_1261

Re: java program got Stuck at broadcasting

2015-05-21 Thread allanjie
Sure, the code is very simple. I think u guys can understand from the main
function.

public class Test1 {

public static double[][] createBroadcastPoints(String localPointPath, 
int
row, int col) throws IOException{
BufferedReader br = RAWF.reader(localPointPath);
String line = null;
int rowIndex = 0;
double[][] pointFeatures = new double[row][col];
while((line = br.readLine())!=null){
List point = Arrays.asList(line.split(","));
int colIndex = 0;
for(String pointFeature: point){
pointFeatures[rowIndex][colIndex] = 
Double.valueOf(pointFeature);
colIndex++;
}
rowIndex++;
}
br.close();
return pointFeatures;
}



public static void main(String[] args) throws IOException{
/**Parameter Setting***/
 String localPointPath = 
"/home/hduser/skyrock/skyrockImageFeatures.csv";
 String remoteFilePath =
"hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
//this csv file is only 468MB
 final int row = 133433;
 final int col = 458;
 /**/

SparkConf conf = new SparkConf().
setAppName("distance").
setMaster("spark://HadoopV26Master:7077").
set("spark.executor.memory", "4g").
set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", 
"/usr/local/spark/logs/spark-events")
.set("spark.local.dir", "/tmp/spark-temp");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD textFile = sc.textFile(remoteFilePath);
//Broadcast variable
//double[][] xx =;

final Broadcast broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
//final Broadcast broadcastPoints = 
sc.broadcast(xx);

/**
 * Compute the distance in terms of each point on each instance.
 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
 */
JavaPairRDD distance = textFile.flatMapToPair(new
PairFlatMapFunction(){
public Iterable> call(String 
v1) throws
Exception{
List al = Arrays.asList(v1.split(",")); 
double[] featureVals = new double[al.size()];
for(int j=0;j> list = new 
ArrayList>();
for(int i=0;i(new
Pair(i,jIndex),Math.sqrt(sum)));
}
return list;
}
});



distance.saveAsTextFile("hdfs://HadoopV26Master:9000/user/"+args[0]);
}
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-program-got-Stuck-at-broadcasting-tp22953p22973.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark dramatically slow when I add "saveAsTextFile"

2015-05-24 Thread allanjie
*Problem Description*:

The program running in  stand-alone spark cluster (1 master, 6 workers with
8g ram and 2 cores).
Input: a 468MB file with 133433 records stored in HDFS.
Output: just 2MB file will stored in HDFS
The program has two map operations and one reduceByKey operation.
Finally I save the result to HDFS using "*saveAsTextFile*".
*Problem*: if I don't add "saveAsTextFile", the program runs very fast(a few
seconds), otherwise extremely slow until about 30 mins.

*My program (is very Simple)*
public static void main(String[] args) throws IOException{
/**Parameter Setting***/
 String localPointPath = 
"/home/hduser/skyrock/skyrockImageFeatures.csv";
 String remoteFilePath =
"hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
 String outputPath = 
"hdfs://HadoopV26Master:9000/user/sparkoutput/";
 final int row = 133433;
 final int col = 458;
 final double dc = Double.valueOf(args[0]);

SparkConf conf = new SparkConf().
setAppName("distance")
.set("spark.executor.memory", 
"4g").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.eventLog.enabled", "true");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD textFile = sc.textFile(remoteFilePath);

//Broadcast variable, the dimension of this double array: 
133433*458
final Broadcast broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
/**
 * Compute the distance in terms of each point on each instance.
 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
 */
JavaPairRDD distance = 
textFile.flatMapToPair(new
PairFlatMapFunction(){
public Iterable> call(String 
v1) throws
Exception{
List al = Arrays.asList(v1.split(",")); 
double[] featureVals = new double[al.size()];
for(int j=0;j> list = new
ArrayList>();
for(int i=0;i(jIndex, Math.sqrt(sum) ));
}
return list;
}
});

//Create zeroOne density
JavaPairRDD densityZeroOne = 
distance.mapValues(new
Function(){
public Integer call(Double v1) throws Exception {
if(v1 counts = 
densityZeroOne.reduceByKey(new
Function2() {
public Integer call(Integer v1, Integer v2) 
throws Exception {
return v1+v2;
}
});
counts.*saveAsTextFile*(outputPath+args[1]);
sc.stop();
}

*If I comment "saveAsTextFile", log will be:*
Picked up _JAVA_OPTIONS: -Xmx4g
15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser
15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/24 15:21:31 INFO Remoting: Starting remoting
15/05/24 15:21:31 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@HadoopV26Master:57429]
15/05/24 15:21:31 INFO util.Utils: Successfully started service
'sparkDriver' on port 57429.
15/05/24 15:21:31 INFO spark.SparkEnv: Registering MapOutputTracker
15/05/24 15:21:31 INFO spark.SparkEnv: Registering BlockManagerMaster
15/05/24 15:21:31 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-6342bde9-feca-4651-8cca-a67541150420/blockmgr-e92d0ae0-ec95-44cb-986a-266a1899202b
15/05/24 15:21:31 INFO storage.MemoryStore: MemoryStore started with
capacity 1966.1 MB
15/05/24 15:21:31 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-fea59c9e-1264-45e9-ad31-484d7de83d0a/httpd-c6421767-ffaf-4417-905e-34b3d13a7bf4
15/05/24 15:21:31 INFO spark.HttpServer: Starting HTTP Server
15/05/24 15:21:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/24 15:21:31 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:36956
15/05/24 15:21:31 INFO util.Utils: Successfully started service 'HTTP file
server' on port 36956.
15/05/24 15:21:31 INFO spark.Sp

The stage slow when I have for loop inside (Java)

2015-05-24 Thread allanjie
Hi all,

I only have one stage which is "mapToPair" and inside the function, I have a
for loop which will do about 133433 times. 

But then it becomes slow, when I replace 133433 with just 133, it works very
fast. 
But I think this is just a simple operation even in normal Java.

You can look at the code in my github. 
(https://github.com/allanj/myspark/blob/master/sparktest.java)

When I replace the for loop size 133433 with 133, it runs fast.
Otherwise, very very slow, I can see it from the web console. The input
records increase very slowly.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-stage-slow-when-I-have-for-loop-inside-Java-tp23010.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org