Re: spark there is no space on the disk

2015-03-31 Thread Peng Xia
Yes, we have just modified the configuration, and every thing works fine.
Thanks very much for the help.

On Thu, Mar 19, 2015 at 5:24 PM, Ted Yu yuzhih...@gmail.com wrote:

 For YARN, possibly this one ?

 property
   nameyarn.nodemanager.local-dirs/name
   value/hadoop/yarn/local/value
 /property

 Cheers

 On Thu, Mar 19, 2015 at 2:21 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 IIRC you have to set that configuration on the Worker processes (for
 standalone). The app can't override it (only for a client-mode
 driver). YARN has a similar configuration, but I don't know the name
 (shouldn't be hard to find, though).

 On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu dav...@databricks.com
 wrote:
  Is it possible that `spark.local.dir` is overriden by others? The docs
 say:
 
  NOTE: In Spark 1.0 and later this will be overriden by
  SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN)
 
  On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia sparkpeng...@gmail.com
 wrote:
  Hi Sean,
 
  Thank very much for your reply.
  I tried to config it from below code:
 
  sf = SparkConf().setAppName(test).set(spark.executor.memory,
  45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp)
 
  But still get the error.
  Do you know how I can config this?
 
 
  Thanks,
  Best,
  Peng
 
 
  On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote:
 
  It means pretty much what it says. You ran out of space on an executor
  (not driver), because the dir used for serialization temp files is
  full (not all volumes). Set spark.local.dirs to something more
  appropriate and larger.
 
  On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com
 wrote:
   Hi
  
  
   I was running a logistic regression algorithm on a 8 nodes spark
   cluster,
   each node has 8 cores and 56 GB Ram (each node is running a windows
   system).
   And the spark installation driver has 1.9 TB capacity. The dataset
 I was
   training on are has around 40 million records with around 6600
 features.
   But
   I always get this error during the training process:
  
   Py4JJavaError: An error occurred while calling
   o70.trainLogisticRegressionModelWithLBFGS.
   : org.apache.spark.SparkException: Job aborted due to stage failure:
   Task
   2709 in stage 3.0 failed 4 times, most recent failure: Lost task
 2709.3
   in
   stage 3.0 (TID 2766,
   workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
   java.io.IOException: There is not enough space on the disk
   at java.io.FileOutputStream.writeBytes(Native Method)
   at java.io.FileOutputStream.write(FileOutputStream.java:345)
   at
   java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
   at
  
  
 org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
   at
  
  
 org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
   at
  
 org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
   at
  
  
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
   at
  
  
 java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
   at
  
  
 java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
   at
   java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
   at
  
  
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
   at
  
  
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
   at
  
  
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
   at
   org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
   at
   org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
   at
  
  
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
   at
  
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
   at
   org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
   at
   org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
   at
   org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
   at
   org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at
  
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
  
   Driver stacktrace

Re: refer to dictionary

2015-03-31 Thread Peng Xia
Hi Ted,

Thanks very much, yea, using broadcast is much faster.

Best,
Peng

On Tue, Mar 31, 2015 at 8:49 AM, Ted Yu yuzhih...@gmail.com wrote:

 You can use broadcast variable.

 See also this thread:

 http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variablesubj=How+Broadcast+variable+scale+



  On Mar 31, 2015, at 4:43 AM, Peng Xia sparkpeng...@gmail.com wrote:
 
  Hi,
 
  I have a RDD (rdd1)where each line is split into an array [a, b,
 c], etc.
  And I also have a local dictionary p (dict1) stores key value pair
 {a:1, b: 2, c:3}
  I want to replace the keys in the rdd with the its corresponding value
 in the dict:
  rdd1.map(lambda line: [dict1[item] for item in line])
 
  But this task is not distributed, I believe the reason is the dict1 is a
 local instance.
  Can any one provide suggestions on this to parallelize this?
 
 
  Thanks,
  Best,
  Peng
 



Re: spark there is no space on the disk

2015-03-14 Thread Peng Xia
Hi Sean,

Thank very much for your reply.
I tried to config it from below code:

sf = SparkConf().setAppName(test).set(spark.executor.memory,
45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp)

But still get the error.
Do you know how I can config this?


Thanks,
Best,
Peng


On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote:

 It means pretty much what it says. You ran out of space on an executor
 (not driver), because the dir used for serialization temp files is
 full (not all volumes). Set spark.local.dirs to something more
 appropriate and larger.

 On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote:
  Hi
 
 
  I was running a logistic regression algorithm on a 8 nodes spark cluster,
  each node has 8 cores and 56 GB Ram (each node is running a windows
 system).
  And the spark installation driver has 1.9 TB capacity. The dataset I was
  training on are has around 40 million records with around 6600 features.
 But
  I always get this error during the training process:
 
  Py4JJavaError: An error occurred while calling
  o70.trainLogisticRegressionModelWithLBFGS.
  : org.apache.spark.SparkException: Job aborted due to stage failure: Task
  2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3
 in
  stage 3.0 (TID 2766,
  workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
  java.io.IOException: There is not enough space on the disk
  at java.io.FileOutputStream.writeBytes(Native Method)
  at java.io.FileOutputStream.write(FileOutputStream.java:345)
  at
 java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
  at
 
 org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
  at
 
 org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
  at
  org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
  at
 
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
  at
 
 java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
  at
 
 java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
  at
  java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
  at
 
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
  at
 
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
  at
 
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
  at
  org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
  at
  org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
  at
  org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
  at
  org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
  at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
  at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
  at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
  at
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
  at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 
  Driver stacktrace:
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
  at scala.Option.foreach(Option.scala:236)
  at
 
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
  at
 
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
  at akka.actor.Actor$class.aroundReceive

Re: spark there is no space on the disk

2015-03-14 Thread Peng Xia
And I have 2 TB free space on C driver.

On Sat, Mar 14, 2015 at 8:29 PM, Peng Xia sparkpeng...@gmail.com wrote:

 Hi Sean,

 Thank very much for your reply.
 I tried to config it from below code:

 sf = SparkConf().setAppName(test).set(spark.executor.memory, 
 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp)

 But still get the error.
 Do you know how I can config this?


 Thanks,
 Best,
 Peng


 On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote:

 It means pretty much what it says. You ran out of space on an executor
 (not driver), because the dir used for serialization temp files is
 full (not all volumes). Set spark.local.dirs to something more
 appropriate and larger.

 On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote:
  Hi
 
 
  I was running a logistic regression algorithm on a 8 nodes spark
 cluster,
  each node has 8 cores and 56 GB Ram (each node is running a windows
 system).
  And the spark installation driver has 1.9 TB capacity. The dataset I was
  training on are has around 40 million records with around 6600
 features. But
  I always get this error during the training process:
 
  Py4JJavaError: An error occurred while calling
  o70.trainLogisticRegressionModelWithLBFGS.
  : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task
  2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3
 in
  stage 3.0 (TID 2766,
  workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
  java.io.IOException: There is not enough space on the disk
  at java.io.FileOutputStream.writeBytes(Native Method)
  at java.io.FileOutputStream.write(FileOutputStream.java:345)
  at
 java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
  at
 
 org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
  at
 
 org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
  at
  org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
  at
 
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
  at
 
 java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
  at
 
 java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
  at
  java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
  at
 
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
  at
 
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
  at
 
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
  at
  org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
  at
  org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
  at
 
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
  at
  org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
  at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
  at
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
  at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
  at
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
  at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 
  Driver stacktrace:
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
  at scala.Option.foreach(Option.scala:236)
  at
 
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696

spark there is no space on the disk

2015-03-13 Thread Peng Xia
Hi


I was running a logistic regression algorithm on a 8 nodes spark cluster,
each node has 8 cores and 56 GB Ram (each node is running a windows
system). And the spark installation driver has 1.9 TB capacity. The dataset
I was training on are has around 40 million records with around 6600
features. But I always get this error during the training process:

Py4JJavaError: An error occurred while calling
o70.trainLogisticRegressionModelWithLBFGS.:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task
2709.3 in stage 3.0 (TID 2766,
workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
java.io.IOException: There is not enough space on the disk
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at 
org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
at 
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
at 
org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
at 
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
at 
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Re: error on training with logistic regression sgd

2015-03-10 Thread Peng Xia
Hi,

Can anyone give an idea about this?
Just did some google search, it seems related to the 2gb limitation on
block size, https://issues.apache.org/jira/browse/SPARK-1476.
The whole process is that:
1. load the data
2. convert each line of data into labeled points using some feature hashing
algorithm in python.
3. train a logistic regression model with  the converted labeled points.
Can any one give some advice for how to avoid the 2gb, if this is the cause?
Thanks very much for the help.

Best,
Peng

On Mon, Mar 9, 2015 at 3:54 PM, Peng Xia sparkpeng...@gmail.com wrote:

 Hi,

 I was launching a spark cluster with 4 work nodes, each work nodes
 contains 8 cores and 56gb ram, and I was testing my logistic regression
 problem.
 The training set is around 1.2 million records.When I was using 2**10
 (1024) features, the whole program works fine, but when I use 2**14
 features, the program has encountered the error:

 Py4JJavaError: An error occurred while calling 
 o84.trainLogisticRegressionModelWithSGD.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 
 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 
 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): 
 java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
 Integer.MAX_VALUE
   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
   at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
   at 
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
   at 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at 
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
   at 
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
   at 
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
   at 
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
   at 
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
   at 
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
   at 
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
   at 
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
   at 
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
   at 
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
   at java.lang.Thread.run(Thread.java:745)

   at 
 org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156)
   at 
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93

error on training with logistic regression sgd

2015-03-09 Thread Peng Xia
Hi,

I was launching a spark cluster with 4 work nodes, each work nodes contains
8 cores and 56gb ram, and I was testing my logistic regression problem.
The training set is around 1.2 million records.When I was using 2**10
(1024) features, the whole program works fine, but when I use 2**14
features, the program has encountered the error:

Py4JJavaError: An error occurred while calling
o84.trainLogisticRegressionModelWithSGD.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3
in stage 4.0 (TID 9,
workernode0.sparkexperience4a7.d5.internal.cloudapp.net):
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size
exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)

at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at 

issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Hi,



Previous we have applied SVM algorithm in MLlib to 5 million records (600
mb), it takes more than 25 minutes to finish.
The spark version we are using is 1.0 and we were running this program on a
4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.

The 5 million records only have two distinct records (One positive and one
negative), others are all duplications.

Any one has any idea on why it takes so long on this small data?



Thanks,
Best,

Peng


Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Thanks for all your help.
I think I didn't cache the data. My previous cluster was expired and I
don't have a chance to check the load balance or app manager.
Below is my code.
There are 18 features for each record and I am using the Scala API.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import java.util.Calendar

object BenchmarkClassification {
def main(args: Array[String]) {
// Load and parse the data file
val conf = new SparkConf()
  .setAppName(SVM)
  .set(spark.executor.memory, 8g)
  // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
val sc = new SparkContext(conf)
val data = sc.textFile(args(0))
val parsedData = data.map { line =
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
x.toDouble)))
}
val testData = sc.textFile(args(1))
val testParsedData = testData .map { line =
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
x.toDouble)))
}

// Run training algorithm to build the model
val numIterations = 20
val model = SVMWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
// val labelAndPreds = testParsedData.map { point =
//   val prediction = model.predict(point.features)
//   (point.label, prediction)
// }
// val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
testParsedData.count
// println(Training Error =  + trainErr)
println(Calendar.getInstance().getTime())
}
}




Thanks,
Best,
Peng

On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote:

 DId you cache the data and check the load balancing? How many
 features? Which API are you using, Scala, Java, or Python? -Xiangrui

 On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
  Watch the app manager it should tell you what's running and taking
 awhile...
  My guess it's a distinct function on the data.
  J
 
  Sent from my iPhone
 
  On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
 
  Hi,
 
 
 
  Previous we have applied SVM algorithm in MLlib to 5 million records (600
  mb), it takes more than 25 minutes to finish.
  The spark version we are using is 1.0 and we were running this program
 on a
  4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
 
  The 5 million records only have two distinct records (One positive and
 one
  negative), others are all duplications.
 
  Any one has any idea on why it takes so long on this small data?
 
 
 
  Thanks,
  Best,
 
  Peng



Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Hi Xiangrui,

Can you give me some code example about caching, as I am new to Spark.

Thanks,
Best,
Peng

On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:

 Then caching should solve the problem. Otherwise, it is just loading
 and parsing data from disk for each iteration. -Xiangrui

 On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote:
  Thanks for all your help.
  I think I didn't cache the data. My previous cluster was expired and I
 don't
  have a chance to check the load balance or app manager.
  Below is my code.
  There are 18 features for each record and I am using the Scala API.
 
  import org.apache.spark.SparkConf
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  import org.apache.spark.rdd._
  import org.apache.spark.mllib.classification.SVMWithSGD
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.linalg.Vectors
  import java.util.Calendar
 
  object BenchmarkClassification {
  def main(args: Array[String]) {
  // Load and parse the data file
  val conf = new SparkConf()
   .setAppName(SVM)
   .set(spark.executor.memory, 8g)
   // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
 val sc = new SparkContext(conf)
  val data = sc.textFile(args(0))
  val parsedData = data.map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
  val testData = sc.textFile(args(1))
  val testParsedData = testData .map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
 
  // Run training algorithm to build the model
  val numIterations = 20
  val model = SVMWithSGD.train(parsedData, numIterations)
 
  // Evaluate model on training examples and compute training error
  // val labelAndPreds = testParsedData.map { point =
  //   val prediction = model.predict(point.features)
  //   (point.label, prediction)
  // }
  // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble
 /
  testParsedData.count
  // println(Training Error =  + trainErr)
  println(Calendar.getInstance().getTime())
  }
  }
 
 
 
 
  Thanks,
  Best,
  Peng
 
  On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote:
 
  DId you cache the data and check the load balancing? How many
  features? Which API are you using, Scala, Java, or Python? -Xiangrui
 
  On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
   Watch the app manager it should tell you what's running and taking
   awhile...
   My guess it's a distinct function on the data.
   J
  
   Sent from my iPhone
  
   On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
  
   Hi,
  
  
  
   Previous we have applied SVM algorithm in MLlib to 5 million records
   (600
   mb), it takes more than 25 minutes to finish.
   The spark version we are using is 1.0 and we were running this program
   on a
   4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
  
   The 5 million records only have two distinct records (One positive and
   one
   negative), others are all duplications.
  
   Any one has any idea on why it takes so long on this small data?
  
  
  
   Thanks,
   Best,
  
   Peng
 
 



Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Thanks Jimmy.
I will have a try.
Thanks very much for your guys' help.

Best,
Peng

On Thu, Oct 30, 2014 at 8:19 PM, Jimmy ji...@sellpoints.com wrote:

 sampleRDD. cache()

 Sent from my iPhone

 On Oct 30, 2014, at 5:01 PM, peng xia toxiap...@gmail.com wrote:

 Hi Xiangrui,

 Can you give me some code example about caching, as I am new to Spark.

 Thanks,
 Best,
 Peng

 On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:

 Then caching should solve the problem. Otherwise, it is just loading
 and parsing data from disk for each iteration. -Xiangrui

 On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote:
  Thanks for all your help.
  I think I didn't cache the data. My previous cluster was expired and I
 don't
  have a chance to check the load balance or app manager.
  Below is my code.
  There are 18 features for each record and I am using the Scala API.
 
  import org.apache.spark.SparkConf
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  import org.apache.spark.rdd._
  import org.apache.spark.mllib.classification.SVMWithSGD
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.linalg.Vectors
  import java.util.Calendar
 
  object BenchmarkClassification {
  def main(args: Array[String]) {
  // Load and parse the data file
  val conf = new SparkConf()
   .setAppName(SVM)
   .set(spark.executor.memory, 8g)
   // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
 val sc = new SparkContext(conf)
  val data = sc.textFile(args(0))
  val parsedData = data.map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
  val testData = sc.textFile(args(1))
  val testParsedData = testData .map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
 
  // Run training algorithm to build the model
  val numIterations = 20
  val model = SVMWithSGD.train(parsedData, numIterations)
 
  // Evaluate model on training examples and compute training error
  // val labelAndPreds = testParsedData.map { point =
  //   val prediction = model.predict(point.features)
  //   (point.label, prediction)
  // }
  // val trainErr = labelAndPreds.filter(r = r._1 !=
 r._2).count.toDouble /
  testParsedData.count
  // println(Training Error =  + trainErr)
  println(Calendar.getInstance().getTime())
  }
  }
 
 
 
 
  Thanks,
  Best,
  Peng
 
  On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com
 wrote:
 
  DId you cache the data and check the load balancing? How many
  features? Which API are you using, Scala, Java, or Python? -Xiangrui
 
  On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
   Watch the app manager it should tell you what's running and taking
   awhile...
   My guess it's a distinct function on the data.
   J
  
   Sent from my iPhone
  
   On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
  
   Hi,
  
  
  
   Previous we have applied SVM algorithm in MLlib to 5 million records
   (600
   mb), it takes more than 25 minutes to finish.
   The spark version we are using is 1.0 and we were running this
 program
   on a
   4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
  
   The 5 million records only have two distinct records (One positive
 and
   one
   negative), others are all duplications.
  
   Any one has any idea on why it takes so long on this small data?
  
  
  
   Thanks,
   Best,
  
   Peng