spark performance non-linear response
Hi All, Im using spark 1.4.1 to to analyze a largish data set (several Gigabytes of data). The RDD is partitioned into 2048 partitions which are more or less equal and entirely cached in RAM. I evaluated the performance on several cluster sizes, and am witnessing a non linear (power) performance improvement as the cluster size increases (plot below). Each node has 4 cores and each worker is configured to use 10GB or RAM. Spark performance I would expect a more linear response given the number of partitions and the fact that all of the data is cached. Can anyone suggest what I should tweak in order to improve the performance? Or perhaps provide an explanation as to the behavior Im witnessing? Yadid
Re: spark performance non-linear response
Additional missing relevant information: Im running a transformation, there are no Shuffles occurring and at the end im performing a lookup of 4 partitions on the driver. On 10/7/15 11:26 AM, Yadid Ayzenberg wrote: Hi All, Im using spark 1.4.1 to to analyze a largish data set (several Gigabytes of data). The RDD is partitioned into 2048 partitions which are more or less equal and entirely cached in RAM. I evaluated the performance on several cluster sizes, and am witnessing a non linear (power) performance improvement as the cluster size increases (plot below). Each node has 4 cores and each worker is configured to use 10GB or RAM. Spark performance I would expect a more linear response given the number of partitions and the fact that all of the data is cached. Can anyone suggest what I should tweak in order to improve the performance? Or perhaps provide an explanation as to the behavior Im witnessing? Yadid
Re: spark 1.4.1 - LZFException
Hi Akhil, No, it seems I have plenty of more disk space available on that node. I look at the logs and one minute before that exception I am seeing the following exception. 15/09/03 12:51:39 ERROR TransportChannelHandler: Connection to /x.y.z.w:44892 has been quiet for 12 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 15/09/03 12:51:39 ERROR TransportResponseHandler: Still have 8 requests outstanding when connection from /18.85.28.197:44892 is closed 15/09/03 12:51:39 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection from /x.y.z.w:44892 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738) at io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) You think that is related to the problem ? Yadid On 8/28/15 1:31 AM, Akhil Das wrote: Is it filling up your disk space? Can you look a bit more in the executor logs to see whats going on Thanks Best Regards On Sun, Aug 23, 2015 at 1:27 AM, Yadid Ayzenberg <ya...@media.mit.edu <mailto:ya...@media.mit.edu>> wrote: Hi All, We have a spark standalone cluster running 1.4.1 and we are setting spark.io.compression.codec to lzf. I have a long running interactive application which behaves as normal, but after a few days I get the following exception in multiple jobs. Any ideas on what could be causing this ? Yadid Job aborted due to stage failure: Task 27 in stage 286.0 failed 4 times, most recent failure: Lost task 27.3 in stage 286.0 (TID 516817, xx.yy.zz.ww): com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length) at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) at com.esotericsoftware.kryo.io.Input.require(Input.java:155) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182) at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169) at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:200) at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:197) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionItera
spark 1.4.1 - LZFException
Hi All, We have a spark standalone cluster running 1.4.1 and we are setting spark.io.compression.codec to lzf. I have a long running interactive application which behaves as normal, but after a few days I get the following exception in multiple jobs. Any ideas on what could be causing this ? Yadid Job aborted due to stage failure: Task 27 in stage 286.0 failed 4 times, most recent failure: Lost task 27.3 in stage 286.0 (TID 516817, xx.yy.zz.ww): com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length) at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) at com.esotericsoftware.kryo.io.Input.require(Input.java:155) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182) at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169) at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:200) at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:197) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.ning.compress.lzf.LZFException: Corrupt input data, block did not start with 2 byte signature ('ZV') followed by type byte, 2-byte length) at com.ning.compress.lzf.ChunkDecoder._reportCorruptHeader(ChunkDecoder.java:267) at com.ning.compress.lzf.impl.UnsafeChunkDecoder.decodeChunk(UnsafeChunkDecoder.java:55) at com.ning.compress.lzf.LZFInputStream.readyBuffer(LZFInputStream.java:363) at com.ning.compress.lzf.LZFInputStream.read(LZFInputStream.java:193) at com.esotericsoftware.kryo.io.Input.fill(Input.java:140) ... 37 more
Re: Change delimiter when collecting SchemaRDD
Thanks Michael, that makes total sense. It works perfectly. Yadid On Thu, Aug 28, 2014 at 9:19 PM, Michael Armbrust mich...@databricks.com wrote: The comma is just the way the default toString works for Row objects. Since SchemaRDDs are also RDDs, you can do arbitrary transformations on the Row objects that are returned. For example, if you'd rather the delimiter was '|': sql(SELECT * FROM src).map(_.mkString(|)).collect() On Thu, Aug 28, 2014 at 7:58 AM, yadid ayzenberg ya...@media.mit.edu wrote: Hi All, Is there any way to change the delimiter from being a comma ? Some of the strings in my data contain commas as well, making it very difficult to parse the results. Yadid
Change delimiter when collecting SchemaRDD
Hi All, Is there any way to change the delimiter from being a comma ? Some of the strings in my data contain commas as well, making it very difficult to parse the results. Yadid
Losing Executors on cluster with RDDs of 100GB
Hi all, I have a spark cluster of 30 machines, 16GB / 8 cores on each running in standalone mode. Previously my application was working well ( several RDDs the largest being around 50G). When I started processing larger amounts of data (RDDs of 100G) my app is losing executors. Im currently just loading them from a database, rePartitioning and persisting to disk (with replication x2) I have spark.executor.memory= 9G, memoryFraction = 0.5, spark.worker.timeout =120, spark.akka.askTimeout=30, spark.storage.blockManagerHeartBeatMs=3. I haven't change the default of my worker memory so its at 512m (should this be larger) ? I've been getting the following messages from my app: [error] o.a.s.s.TaskSchedulerImpl - Lost executor 3 on myserver1: worker lost [error] o.a.s.s.TaskSchedulerImpl - Lost executor 13 on myserver2: Unknown executor exit code (137) (died from signal 9?) [error] a.r.EndpointWriter - AssociationError [akka.tcp://spark@master:59406] - [akka.tcp://sparkExecutor@myserver2:32955]: Error [Association failed with [akka.tcp://sparkExecutor@myserver2:32955]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkexecu...@myserver2.com:32955] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: myserver2/198.18.102.160:32955 ] [error] a.r.EndpointWriter - AssociationError [akka.tcp://spark@master:59406] - [akka.tcp://spark@myserver1:53855]: Error [Association failed with [akka.tcp://spark@myserver1:53855]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@myserver1:53855] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: myserver1/198.18.102.160:53855 ] The worker logs and executor logs do not contain errors. Any ideas what the problem is ? Yadid - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: possible typos in spark 1.0 documentation
Yep, I just issued a pull request. Yadid On 5/31/14, 1:25 PM, Patrick Wendell wrote: 1. ctx is an instance of JavaSQLContext but the textFile method is called as a member of ctx. According to the API JavaSQLContext does not have such a member, so im guessing this should be sc instead. Yeah, I think you are correct. 2. In that same code example the object sqlCtx is referenced, but it is never instantiated in the code. should this be ctx? Also correct. I think it would be good to be consistent and always have ctx refer to a JavaSparkContext and have sqlCtx refer to a JavaSQLContext. Any interest in creating a pull request for this? We'd be happy to accept the change. - Patrick
possible typos in spark 1.0 documentation
Congrats on the new 1.0 release. Amazing work ! It looks like there may some typos in the latest http://spark.apache.org/docs/latest/sql-programming-guide.html in the Running SQL on RDDs section when choosing the java example: 1. ctx is an instance of JavaSQLContext but the textFile method is called as a member of ctx. According to the API JavaSQLContext does not have such a member, so im guessing this should be sc instead. 2. In that same code example the object sqlCtx is referenced, but it is never instantiated in the code. should this be ctx? Cheers, Yadid
Re: NoSuchMethodError: breeze.linalg.DenseMatrix
An additional option 4) Use SparkContext.addJar() and have the application ship your jar to all the nodes. Yadid On 5/4/14, 4:07 PM, DB Tsai wrote: If you add the breeze dependency in your build.sbt project, it will not be available to all the workers. There are couple options, 1) use sbt assembly to package breeze into your application jar. 2) manually copy breeze jar into all the nodes, and have them in the classpath. 3) spark 1.0 has breeze jar in the spark flat assembly jar, so you don't need to add breeze dependency yourself. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sun, May 4, 2014 at 4:07 AM, wxhsdp wxh...@gmail.com mailto:wxh...@gmail.com wrote: Hi, i'am trying to use breeze linalg library for matrix operation in my spark code. i already add dependency on breeze in my build.sbt, and package my code sucessfully. when i run on local mode, sbt run local..., everything is ok but when turn to standalone mode, sbt run spark://127.0.0.1:7077..., error occurs 14/05/04 18:56:29 WARN scheduler.TaskSetManager: Loss was due to java.lang.NoSuchMethodError java.lang.NoSuchMethodError: breeze.linalg.DenseMatrix$.implOpMulMatrix_DMD_DMD_eq_DMD()Lbreeze/linalg/operators/DenseMatrixMultiplyStuff$implOpMulMatrix_DMD_DMD_eq_DMD$; in my opinion, everything needed is packaged to the jar file, isn't it? and does anyone used breeze before? is it good for matrix operation? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-breeze-linalg-DenseMatrix-tp5310.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Strange lookup behavior. Possible bug?
Dear Sparkers, Has anyone got any insight on this ? I am really stuck. Yadid On 4/28/14, 11:28 AM, Yadid Ayzenberg wrote: Thanks for your answer. I tried running on a single machine - master and worker on one host. I get exactly the same results. Very little CPU activity on the machine in question. The web UI shows a single task and its state is RUNNING. it will remain so indefinitely. I have a single partition, and its size is 1626.2 MB Currently the RDD has 200 elements, but I have tried it with 20 and the behavior is the same. The key is of the form: (0,52fb9aff3004f07d1a87c8ea) Where the first number in the tuple is always 0, and the second one is some string that can appear more than once. The RDD is created by using the newAPIHadoopRDD. Any additional info I can provide? Yadid On 4/28/14 10:46 AM, Daniel Darabos wrote: That is quite mysterious, and I do not think we have enough information to answer. JavaPairRDDString, Tuple2.lookup() works fine on a remote Spark cluster: $ MASTER=spark://localhost:7077 bin/spark-shell scala val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 3).map(x = ((x%3).toString, (x, x%3 scala rdd.lookup(1) res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)] You suggest maybe the driver does not receive a message from an executor. I guess it is likely possible, though it has not happened to me. I would recommend running on a single machine in the standalone setup. Start the master and worker on the same machine, run the application there too. This should eliminate network configuration problems. If you still see the issue, I'd check whether the task has really completed. What do you see on the web UI? Is the executor using CPU? Good luck. On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.edu mailto:ya...@media.mit.edu wrote: Can someone please suggest how I can move forward with this? My spark version is 0.9.1. The big challenge is that this issue is not recreated when running in local mode. What could be the difference? I would really appreciate any pointers, as currently the the job just hangs. On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote: Some additional information - maybe this rings a bell with someone: I suspect this happens when the lookup returns more than one value. For 0 and 1 values, the function behaves as you would expect. Anyone ? On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote: Hi All, Im running a lookup on a JavaPairRDDString, Tuple2. When running on local machine - the lookup is successfull. However, when running a standalone cluster with the exact same dataset - one of the tasks never ends (constantly in RUNNING status). When viewing the worker log, it seems that the task has finished successfully: 14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally 14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 10896794 14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver 14/04/25 13:40:38 INFO Executor: Finished task ID 2 But it seems the driver is not aware of this, and hangs indefinitely. If I execute a count priot to the lookup - I get the correct number which suggests that the cluster is operating as expected. The exact same scenario works with a different type of key (Tuple2): JavaPairRDDTuple2, Tuple2. Any ideas on how to debug this problem ? Thanks, Yadid
Re: Strange lookup behavior. Possible bug?
Thanks for your answer. I tried running on a single machine - master and worker on one host. I get exactly the same results. Very little CPU activity on the machine in question. The web UI shows a single task and its state is RUNNING. it will remain so indefinitely. I have a single partition, and its size is 1626.2 MB Currently the RDD has 200 elements, but I have tried it with 20 and the behavior is the same. The key is of the form: (0,52fb9aff3004f07d1a87c8ea) Where the first number in the tuple is always 0, and the second one is some string that can appear more than once. The RDD is created by using the newAPIHadoopRDD. Any additional info I can provide? Yadid On 4/28/14 10:46 AM, Daniel Darabos wrote: That is quite mysterious, and I do not think we have enough information to answer. JavaPairRDDString, Tuple2.lookup() works fine on a remote Spark cluster: $ MASTER=spark://localhost:7077 bin/spark-shell scala val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 3).map(x = ((x%3).toString, (x, x%3 scala rdd.lookup(1) res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)] You suggest maybe the driver does not receive a message from an executor. I guess it is likely possible, though it has not happened to me. I would recommend running on a single machine in the standalone setup. Start the master and worker on the same machine, run the application there too. This should eliminate network configuration problems. If you still see the issue, I'd check whether the task has really completed. What do you see on the web UI? Is the executor using CPU? Good luck. On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.edu mailto:ya...@media.mit.edu wrote: Can someone please suggest how I can move forward with this? My spark version is 0.9.1. The big challenge is that this issue is not recreated when running in local mode. What could be the difference? I would really appreciate any pointers, as currently the the job just hangs. On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote: Some additional information - maybe this rings a bell with someone: I suspect this happens when the lookup returns more than one value. For 0 and 1 values, the function behaves as you would expect. Anyone ? On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote: Hi All, Im running a lookup on a JavaPairRDDString, Tuple2. When running on local machine - the lookup is successfull. However, when running a standalone cluster with the exact same dataset - one of the tasks never ends (constantly in RUNNING status). When viewing the worker log, it seems that the task has finished successfully: 14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally 14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 10896794 14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver 14/04/25 13:40:38 INFO Executor: Finished task ID 2 But it seems the driver is not aware of this, and hangs indefinitely. If I execute a count priot to the lookup - I get the correct number which suggests that the cluster is operating as expected. The exact same scenario works with a different type of key (Tuple2): JavaPairRDDTuple2, Tuple2. Any ideas on how to debug this problem ? Thanks, Yadid
Re: Strange lookup behavior. Possible bug?
Could this be related to the size of the lookup result ? I tried to recreate a similar scenario on the spark shell which causes an exception: scala val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 4, 3).map(x = ( ( 0,52fb9b1a3004f07d1a87c8f3 ), Seq.fill(40)(Random.nextFloat) )) ) rdd: org.apache.spark.api.java.JavaPairRDD[(Int, String),Seq[Float]] = org.apache.spark.api.java.JavaPairRDD@1481cb6e scala rdd.count() res53: Long = 4 scala rdd.lookup((0,52fb9b1a3004f07d1a87c8f3)) org.apache.spark.SparkException: Job aborted: Task 39.0:2 failed 4 times (most recent failure: Exception failure: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 3, required: 4) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) On 4/28/14 11:28 AM, Yadid Ayzenberg wrote: Thanks for your answer. I tried running on a single machine - master and worker on one host. I get exactly the same results. Very little CPU activity on the machine in question. The web UI shows a single task and its state is RUNNING. it will remain so indefinitely. I have a single partition, and its size is 1626.2 MB Currently the RDD has 200 elements, but I have tried it with 20 and the behavior is the same. The key is of the form: (0,52fb9aff3004f07d1a87c8ea) Where the first number in the tuple is always 0, and the second one is some string that can appear more than once. The RDD is created by using the newAPIHadoopRDD. Any additional info I can provide? Yadid On 4/28/14 10:46 AM, Daniel Darabos wrote: That is quite mysterious, and I do not think we have enough information to answer. JavaPairRDDString, Tuple2.lookup() works fine on a remote Spark cluster: $ MASTER=spark://localhost:7077 bin/spark-shell scala val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 3).map(x = ((x%3).toString, (x, x%3 scala rdd.lookup(1) res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)] You suggest maybe the driver does not receive a message from an executor. I guess it is likely possible, though it has not happened to me. I would recommend running on a single machine in the standalone setup. Start the master and worker on the same machine, run the application there too. This should eliminate network configuration problems. If you still see the issue, I'd check whether the task has really completed. What do you see on the web UI? Is the executor using CPU? Good luck. On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.edu mailto:ya...@media.mit.edu wrote: Can someone please suggest how I can move forward with this? My spark version is 0.9.1. The big challenge is that this issue is not recreated when running in local mode. What could be the difference? I would really appreciate any pointers, as currently the the job just hangs. On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote: Some additional information - maybe this rings a bell with someone: I suspect this happens when the lookup returns more than one value. For 0 and 1 values, the function behaves as you would expect. Anyone ? On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote: Hi All, Im running a lookup on a JavaPairRDDString, Tuple2
Re: Strange lookup behavior. Possible bug?
Can someone please suggest how I can move forward with this? My spark version is 0.9.1. The big challenge is that this issue is not recreated when running in local mode. What could be the difference? I would really appreciate any pointers, as currently the the job just hangs. On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote: Some additional information - maybe this rings a bell with someone: I suspect this happens when the lookup returns more than one value. For 0 and 1 values, the function behaves as you would expect. Anyone ? On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote: Hi All, Im running a lookup on a JavaPairRDDString, Tuple2. When running on local machine - the lookup is successfull. However, when running a standalone cluster with the exact same dataset - one of the tasks never ends (constantly in RUNNING status). When viewing the worker log, it seems that the task has finished successfully: 14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally 14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 10896794 14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver 14/04/25 13:40:38 INFO Executor: Finished task ID 2 But it seems the driver is not aware of this, and hangs indefinitely. If I execute a count priot to the lookup - I get the correct number which suggests that the cluster is operating as expected. The exact same scenario works with a different type of key (Tuple2): JavaPairRDDTuple2, Tuple2. Any ideas on how to debug this problem ? Thanks, Yadid
Strange lookup behavior. Possible bug?
Hi All, Im running a lookup on a JavaPairRDDString, Tuple2. When running on local machine - the lookup is successfull. However, when running a standalone cluster with the exact same dataset - one of the tasks never ends (constantly in RUNNING status). When viewing the worker log, it seems that the task has finished successfully: 14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally 14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 10896794 14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver 14/04/25 13:40:38 INFO Executor: Finished task ID 2 But it seems the driver is not aware of this, and hangs indefinitely. If I execute a count priot to the lookup - I get the correct number which suggests that the cluster is operating as expected. The exact same scenario works with a different type of key (Tuple2): JavaPairRDDTuple2, Tuple2. Any ideas on how to debug this problem ? Thanks, Yadid
Re: Strange lookup behavior. Possible bug?
Some additional information - maybe this rings a bell with someone: I suspect this happens when the lookup returns more than one value. For 0 and 1 values, the function behaves as you would expect. Anyone ? On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote: Hi All, Im running a lookup on a JavaPairRDDString, Tuple2. When running on local machine - the lookup is successfull. However, when running a standalone cluster with the exact same dataset - one of the tasks never ends (constantly in RUNNING status). When viewing the worker log, it seems that the task has finished successfully: 14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally 14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 10896794 14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver 14/04/25 13:40:38 INFO Executor: Finished task ID 2 But it seems the driver is not aware of this, and hangs indefinitely. If I execute a count priot to the lookup - I get the correct number which suggests that the cluster is operating as expected. The exact same scenario works with a different type of key (Tuple2): JavaPairRDDTuple2, Tuple2. Any ideas on how to debug this problem ? Thanks, Yadid