Re: spark there is no space on the disk
Yes, we have just modified the configuration, and every thing works fine. Thanks very much for the help. On Thu, Mar 19, 2015 at 5:24 PM, Ted Yu yuzhih...@gmail.com wrote: For YARN, possibly this one ? property nameyarn.nodemanager.local-dirs/name value/hadoop/yarn/local/value /property Cheers On Thu, Mar 19, 2015 at 2:21 PM, Marcelo Vanzin van...@cloudera.com wrote: IIRC you have to set that configuration on the Worker processes (for standalone). The app can't override it (only for a client-mode driver). YARN has a similar configuration, but I don't know the name (shouldn't be hard to find, though). On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu dav...@databricks.com wrote: Is it possible that `spark.local.dir` is overriden by others? The docs say: NOTE: In Spark 1.0 and later this will be overriden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia sparkpeng...@gmail.com wrote: Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace
Re: refer to dictionary
Hi Ted, Thanks very much, yea, using broadcast is much faster. Best, Peng On Tue, Mar 31, 2015 at 8:49 AM, Ted Yu yuzhih...@gmail.com wrote: You can use broadcast variable. See also this thread: http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variablesubj=How+Broadcast+variable+scale+ On Mar 31, 2015, at 4:43 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi, I have a RDD (rdd1)where each line is split into an array [a, b, c], etc. And I also have a local dictionary p (dict1) stores key value pair {a:1, b: 2, c:3} I want to replace the keys in the rdd with the its corresponding value in the dict: rdd1.map(lambda line: [dict1[item] for item in line]) But this task is not distributed, I believe the reason is the dict1 is a local instance. Can any one provide suggestions on this to parallelize this? Thanks, Best, Peng
Re: spark there is no space on the disk
Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive
Re: spark there is no space on the disk
And I have 2 TB free space on C driver. On Sat, Mar 14, 2015 at 8:29 PM, Peng Xia sparkpeng...@gmail.com wrote: Hi Sean, Thank very much for your reply. I tried to config it from below code: sf = SparkConf().setAppName(test).set(spark.executor.memory, 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp) But still get the error. Do you know how I can config this? Thanks, Best, Peng On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote: It means pretty much what it says. You ran out of space on an executor (not driver), because the dir used for serialization temp files is full (not all volumes). Set spark.local.dirs to something more appropriate and larger. On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote: Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696
spark there is no space on the disk
Hi I was running a logistic regression algorithm on a 8 nodes spark cluster, each node has 8 cores and 56 GB Ram (each node is running a windows system). And the spark installation driver has 1.9 TB capacity. The dataset I was training on are has around 40 million records with around 6600 features. But I always get this error during the training process: Py4JJavaError: An error occurred while calling o70.trainLogisticRegressionModelWithLBFGS.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in stage 3.0 (TID 2766, workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net): java.io.IOException: There is not enough space on the disk at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:243) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Re: error on training with logistic regression sgd
Hi, Can anyone give an idea about this? Just did some google search, it seems related to the 2gb limitation on block size, https://issues.apache.org/jira/browse/SPARK-1476. The whole process is that: 1. load the data 2. convert each line of data into labeled points using some feature hashing algorithm in python. 3. train a logistic regression model with the converted labeled points. Can any one give some advice for how to avoid the 2gb, if this is the cause? Thanks very much for the help. Best, Peng On Mon, Mar 9, 2015 at 3:54 PM, Peng Xia sparkpeng...@gmail.com wrote: Hi, I was launching a spark cluster with 4 work nodes, each work nodes contains 8 cores and 56gb ram, and I was testing my logistic regression problem. The training set is around 1.2 million records.When I was using 2**10 (1024) features, the whole program works fine, but when I use 2**14 features, the program has encountered the error: Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93
error on training with logistic regression sgd
Hi, I was launching a spark cluster with 4 work nodes, each work nodes contains 8 cores and 56gb ram, and I was testing my logistic regression problem. The training set is around 1.2 million records.When I was using 2**10 (1024) features, the whole program works fine, but when I use 2**14 features, the program has encountered the error: Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at
issue on applying SVM to 5 million examples.
Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Re: issue on applying SVM to 5 million examples.
Thanks for all your help. I think I didn't cache the data. My previous cluster was expired and I don't have a chance to check the load balance or app manager. Below is my code. There are 18 features for each record and I am using the Scala API. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import java.util.Calendar object BenchmarkClassification { def main(args: Array[String]) { // Load and parse the data file val conf = new SparkConf() .setAppName(SVM) .set(spark.executor.memory, 8g) // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g) val sc = new SparkContext(conf) val data = sc.textFile(args(0)) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } val testData = sc.textFile(args(1)) val testParsedData = testData .map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } // Run training algorithm to build the model val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error // val labelAndPreds = testParsedData.map { point = // val prediction = model.predict(point.features) // (point.label, prediction) // } // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testParsedData.count // println(Training Error = + trainErr) println(Calendar.getInstance().getTime()) } } Thanks, Best, Peng On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote: DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Re: issue on applying SVM to 5 million examples.
Hi Xiangrui, Can you give me some code example about caching, as I am new to Spark. Thanks, Best, Peng On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote: Then caching should solve the problem. Otherwise, it is just loading and parsing data from disk for each iteration. -Xiangrui On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote: Thanks for all your help. I think I didn't cache the data. My previous cluster was expired and I don't have a chance to check the load balance or app manager. Below is my code. There are 18 features for each record and I am using the Scala API. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import java.util.Calendar object BenchmarkClassification { def main(args: Array[String]) { // Load and parse the data file val conf = new SparkConf() .setAppName(SVM) .set(spark.executor.memory, 8g) // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g) val sc = new SparkContext(conf) val data = sc.textFile(args(0)) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } val testData = sc.textFile(args(1)) val testParsedData = testData .map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } // Run training algorithm to build the model val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error // val labelAndPreds = testParsedData.map { point = // val prediction = model.predict(point.features) // (point.label, prediction) // } // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testParsedData.count // println(Training Error = + trainErr) println(Calendar.getInstance().getTime()) } } Thanks, Best, Peng On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote: DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng
Re: issue on applying SVM to 5 million examples.
Thanks Jimmy. I will have a try. Thanks very much for your guys' help. Best, Peng On Thu, Oct 30, 2014 at 8:19 PM, Jimmy ji...@sellpoints.com wrote: sampleRDD. cache() Sent from my iPhone On Oct 30, 2014, at 5:01 PM, peng xia toxiap...@gmail.com wrote: Hi Xiangrui, Can you give me some code example about caching, as I am new to Spark. Thanks, Best, Peng On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote: Then caching should solve the problem. Otherwise, it is just loading and parsing data from disk for each iteration. -Xiangrui On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote: Thanks for all your help. I think I didn't cache the data. My previous cluster was expired and I don't have a chance to check the load balance or app manager. Below is my code. There are 18 features for each record and I am using the Scala API. import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import java.util.Calendar object BenchmarkClassification { def main(args: Array[String]) { // Load and parse the data file val conf = new SparkConf() .setAppName(SVM) .set(spark.executor.memory, 8g) // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g) val sc = new SparkContext(conf) val data = sc.textFile(args(0)) val parsedData = data.map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } val testData = sc.textFile(args(1)) val testParsedData = testData .map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x = x.toDouble))) } // Run training algorithm to build the model val numIterations = 20 val model = SVMWithSGD.train(parsedData, numIterations) // Evaluate model on training examples and compute training error // val labelAndPreds = testParsedData.map { point = // val prediction = model.predict(point.features) // (point.label, prediction) // } // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / testParsedData.count // println(Training Error = + trainErr) println(Calendar.getInstance().getTime()) } } Thanks, Best, Peng On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote: DId you cache the data and check the load balancing? How many features? Which API are you using, Scala, Java, or Python? -Xiangrui On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote: Watch the app manager it should tell you what's running and taking awhile... My guess it's a distinct function on the data. J Sent from my iPhone On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote: Hi, Previous we have applied SVM algorithm in MLlib to 5 million records (600 mb), it takes more than 25 minutes to finish. The spark version we are using is 1.0 and we were running this program on a 4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM. The 5 million records only have two distinct records (One positive and one negative), others are all duplications. Any one has any idea on why it takes so long on this small data? Thanks, Best, Peng