I am also seeing this error in a YARN spark streaming (1.2.0) application
Tim Smith wrote > Similar issue (Spark 1.0.0). Streaming app runs for a few seconds > before these errors start to pop all over the driver logs: > > 14/09/12 17:30:23 WARN TaskSetManager: Loss was due to java.lang.Exception > java.lang.Exception: Could not compute split, block > input-4-1410542878200 not found > at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:33) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:74) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > I am using "MEMORY_AND_DISK_SER" for all my RDDs so I should not be > losing any blocks unless I run out of disk space, right? > > > > On Fri, Sep 12, 2014 at 5:24 AM, Dibyendu Bhattacharya > < > dibyendu.bhattachary@ > > wrote: >> I agree, >> >> Even the Low Level Kafka Consumer which I have written has tunable IO >> throttling which help me solve this issue ... But question remains , even >> if >> there are large backlog, why Spark drop the unprocessed memory blocks ? >> >> Dib >> >> On Fri, Sep 12, 2014 at 5:47 PM, Jeoffrey Lim < > jeoffreyl@ > > wrote: >>> >>> Our issue could be related to this problem as described in: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html >>> which the DStream is processed for every 1 hour batch duration. >>> >>> I have implemented IO throttling in the Receiver as well in our Kafka >>> consumer, and our backlog is not that large. >>> >>> NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for >>> dropping >>> INFO : org.apache.spark.storage.BlockManager - Dropping block >>> input-0-1410443074600 from memory >>> INFO : org.apache.spark.storage.MemoryStore - Block >>> input-0-1410443074600 >>> of size 12651900 dropped from memory (free 21220667) >>> INFO : org.apache.spark.storage.BlockManagerInfo - Removed >>> input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory >>> (size: 12.1 MB, free: 100.6 MB) >>> >>> The question that I have now is: how to prevent the >>> MemoryStore/BlockManager of dropping the block inputs? And should they >>> be >>> logged in the level WARN/ERROR? >>> >>> >>> Thanks. >>> >>> >>> On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark >>> User List] <[hidden email]> wrote: >>>> >>>> Dear all, >>>> >>>> I am sorry. This was a false alarm >>>> >>>> There was some issue in the RDD processing logic which leads to large >>>> backlog. Once I fixed the issues in my processing logic, I can see all >>>> messages being pulled nicely without any Block Removed error. I need to >>>> tune >>>> certain configurations in my Kafka Consumer to modify the data rate and >>>> also >>>> the batch size. >>>> >>>> Sorry again. >>>> >>>> >>>> Regards, >>>> Dibyendu >>>> >>>> On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu <[hidden email]> wrote: >>>>> >>>>> This is my case about broadcast variable: >>>>> >>>>> 14/07/21 19:49:13 INFO Executor: Running task ID 4 >>>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on >>>>> localhost (progress: 3/106) >>>>> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for >>>>> hdfstest_customers >>>>> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is >>>>> 596 >>>>> 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to >>>>> driver >>>>> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally >>>>> 14/07/21 19:49:13 INFO Executor: Finished task ID 3 >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on >>>>> executor localhost: localhost (PROCESS_LOCAL) >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 >>>>> bytes in 0 ms >>>>> 14/07/21 19:49:13 INFO Executor: Running task ID 5 >>>>> 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 >>>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) >>>>> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on >>>>> localhost (progress: 4/106) >>>>> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally >>>>> 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 >>>>> 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 >>>>> dropped from memory (free 886623436) >>>>> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 >>>>> 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for >>>>> shuffle 0 >>>>> 14/07/21 19:49:13 INFO HadoopRDD: Input split: >>>>> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 >>>>> 14/07/21 19:49:13 INFO HadoopRDD: Input split: >>>>> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 >>>>> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for >>>>> hdfstest_customers >>>>> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is >>>>> 596 >>>>> 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to >>>>> driver >>>>> 14/07/21 19:49:13 INFO Executor: Finished task ID 4 >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on >>>>> executor localhost: localhost (PROCESS_LOCAL) >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 >>>>> bytes in 0 ms >>>>> 14/07/21 19:49:13 INFO Executor: Running task ID 6 >>>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on >>>>> localhost (progress: 5/106) >>>>> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for >>>>> hdfstest_customers >>>>> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is >>>>> 596 >>>>> 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to >>>>> driver >>>>> 14/07/21 19:49:13 INFO Executor: Finished task ID 5 >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on >>>>> executor localhost: localhost (PROCESS_LOCAL) >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 >>>>> bytes in 0 ms >>>>> 14/07/21 19:49:13 INFO Executor: Running task ID 7 >>>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) >>>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on >>>>> localhost (progress: 6/106) >>>>> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast >>>>> variable >>>>> 0 >>>>> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast >>>>> variable >>>>> 0 >>>>> 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 >>>>> java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 >>>>> at >>>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624) >>>>> at >>>>> org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) >>>>> at >>>>> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) >>>>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>> at >>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at >>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at >>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >>>>> at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>> at >>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at >>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at >>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >>>>> at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>> at >>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at >>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>> at >>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) >>>>> at >>>>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) >>>>> at >>>>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) >>>>> at >>>>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>> at >>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) >>>>> at >>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:744) >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Nan Zhu >>>>> >>>>> On Thursday, September 11, 2014 at 10:42 AM, Nan Zhu wrote: >>>>> >>>>> Hi, >>>>> >>>>> Can you attach more logs to see if there is some entry from >>>>> ContextCleaner? >>>>> >>>>> I met very similar issue before…but haven’t get resolved >>>>> >>>>> Best, >>>>> >>>>> -- >>>>> Nan Zhu >>>>> >>>>> On Thursday, September 11, 2014 at 10:13 AM, Dibyendu Bhattacharya >>>>> wrote: >>>>> >>>>> Dear All, >>>>> >>>>> Not sure if this is a false alarm. But wanted to raise to this to >>>>> understand what is happening. >>>>> >>>>> I am testing the Kafka Receiver which I have written >>>>> (https://github.com/dibbhatt/kafka-spark-consumer) which basically a >>>>> low >>>>> level Kafka Consumer implemented custom Receivers for every Kafka >>>>> topic >>>>> partitions and pulling data in parallel. Individual streams from all >>>>> topic >>>>> partitions are then merged to create Union stream which used for >>>>> further >>>>> processing. >>>>> >>>>> The custom Receiver working fine in normal load with no issues. But >>>>> when >>>>> I tested this with huge amount of backlog messages from Kafka ( 50 >>>>> million + >>>>> messages), I see couple of major issue in Spark Streaming. Wanted to >>>>> get >>>>> some opinion on this.... >>>>> >>>>> I am using latest Spark 1.1 taken from the source and built it. >>>>> Running >>>>> in Amazon EMR , 3 m1.xlarge Node Spark cluster running in Standalone >>>>> Mode. >>>>> >>>>> Below are two main question I have.. >>>>> >>>>> 1. What I am seeing when I run the Spark Streaming with my Kafka >>>>> Consumer with a huge backlog in Kafka ( around 50 Million), Spark is >>>>> completely busy performing the Receiving task and hardly schedule any >>>>> processing task. Can you let me if this is expected ? If there is >>>>> large >>>>> backlog, Spark will take long time pulling them . Why Spark not doing >>>>> any >>>>> processing ? Is it because of resource limitation ( say all cores are >>>>> busy >>>>> puling ) or it is by design ? I am setting the executor-memory to 10G >>>>> and >>>>> driver-memory to 4G . >>>>> >>>>> 2. This issue seems to be more serious. I have attached the Driver >>>>> trace >>>>> with this email. What I can see very frequently Block are selected to >>>>> be >>>>> Removed...This kind of entries are all over the place. But when a >>>>> Block is >>>>> removed , below problem happen.... May be this issue cause the issue 1 >>>>> that >>>>> no Jobs are getting processed .. >>>>> >>>>> >>>>> INFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for >>>>> dropping >>>>> INFO : org.apache.spark.storage.BlockManager - Dropping block >>>>> input-0-1410443074600 from memory >>>>> INFO : org.apache.spark.storage.MemoryStore - Block >>>>> input-0-1410443074600 of size 12651900 dropped from memory (free >>>>> 21220667) >>>>> INFO : org.apache.spark.storage.BlockManagerInfo - Removed >>>>> input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory >>>>> (size: 12.1 MB, free: 100.6 MB) >>>>> ........... >>>>> >>>>> INFO : org.apache.spark.storage.BlockManagerInfo - Removed >>>>> input-0-1410443074600 on ip-10-252-5-62.asskickery.us:37033 in memory >>>>> (size: >>>>> 12.1 MB, free: 154.6 MB) >>>>> .............. >>>>> >>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in >>>>> stage 7.0 (TID 118, ip-10-252-5-62.asskickery.us): >>>>> java.lang.Exception: >>>>> Could not compute split, block input-0-1410443074600 not found >>>>> >>>>> ........... >>>>> >>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 0.1 in >>>>> stage 7.0 (TID 126) on executor ip-10-252-5-62.asskickery.us: >>>>> java.lang.Exception (Could not compute split, block >>>>> input-0-1410443074600 >>>>> not found) [duplicate 1] >>>>> >>>>> >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task >>>>> 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in >>>>> stage >>>>> 7.0 (TID 139, ip-10-252-5-62.asskickery.us): java.lang.Exception: >>>>> Could not >>>>> compute split, block input-0-1410443074600 not found >>>>> org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) >>>>> >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >>>>> org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>>>> >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >>>>> >>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:227) >>>>> >>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>>>> org.apache.spark.scheduler.Task.run(Task.scala:54) >>>>> >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> java.lang.Thread.run(Thread.java:744) >>>>> >>>>> Regards, >>>>> Dibyendu >>>>> >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: [hidden email] >>>>> For additional commands, e-mail: [hidden email] >>>>> >>>>> Attachments: >>>>> - driver-trace.txt >>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> ________________________________ >>>> If you reply to this email, your message will be added to the >>>> discussion >>>> below: >>>> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Some-Serious-Issue-with-Spark-Streaming-Blocks-Getting-Removed-and-Jobs-have-Failed-tp13972p14075.html >>>> To start a new topic under Apache Spark User List, email [hidden email] >>>> To unsubscribe from Apache Spark User List, click here. >>>> NAML >>> >>> >>> >>> ________________________________ >>> View this message in context: Re: Some Serious Issue with Spark >>> Streaming >>> ? Blocks Getting Removed and Jobs have Failed.. >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: > user-unsubscribe@.apache > For additional commands, e-mail: > user-help@.apache -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Some-Serious-Issue-with-Spark-Streaming-Blocks-Getting-Removed-and-Jobs-have-Failed-tp14241p20932.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org