Our issue could be related to this problem as described in: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html which the DStream is processed for every 1 hour batch duration.
I have implemented IO throttling in the Receiver as well in our Kafka consumer, and our backlog is not that large. NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping INFO : org.apache.spark.storage.BlockManager - Dropping block *input-0-1410443074600* from memory INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of size 12651900 dropped from memory (free 21220667) INFO : org.apache.spark.storage.BlockManagerInfo - Removed input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory (size: 12.1 MB, free: 100.6 MB) The question that I have now is: how to prevent the MemoryStore/BlockManager of dropping the block inputs? And should they be logged in the level WARN/ERROR? Thanks. On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark User List] <ml-node+s1001560n14075...@n3.nabble.com> wrote: > Dear all, > > I am sorry. This was a false alarm > > There was some issue in the RDD processing logic which leads to large > backlog. Once I fixed the issues in my processing logic, I can see all > messages being pulled nicely without any Block Removed error. I need to > tune certain configurations in my Kafka Consumer to modify the data rate > and also the batch size. > > Sorry again. > > > Regards, > Dibyendu > > On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu <[hidden email] > <http://user/SendEmail.jtp?type=node&node=14075&i=0>> wrote: > >> This is my case about broadcast variable: >> >> 14/07/21 19:49:13 INFO Executor: Running task ID 4 >> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) >> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost >> (progress: 3/106) >> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for >> hdfstest_customers >> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 >> 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver >> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally >> 14/07/21 19:49:13 INFO Executor: Finished task ID 3 >> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on >> executor localhost: localhost (PROCESS_LOCAL) >> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes >> in 0 ms >> 14/07/21 19:49:13 INFO Executor: Running task ID 5 >> 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 >> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 >> 19:49:13 INFO ContextCleaner: Cleaned broadcast 0* >> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost >> (progress: 4/106) >> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally >> 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 >> 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from >> memory (free 886623436)* >> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 >> 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 >> 14/07/21 19:49:13 INFO HadoopRDD: Input split: >> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 >> 14/07/21 >> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21> >> 19:49:13 INFO HadoopRDD: Input split: >> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 >> 14/07/21 >> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21> >> 19:49:13 INFO TableOutputFormat: Created table instance for >> hdfstest_customers >> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 >> 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver >> 14/07/21 19:49:13 INFO Executor: Finished task ID 4 >> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on >> executor localhost: localhost (PROCESS_LOCAL) >> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes >> in 0 ms >> 14/07/21 19:49:13 INFO Executor: Running task ID 6 >> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) >> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost >> (progress: 5/106) >> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for >> hdfstest_customers >> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 >> 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver >> 14/07/21 19:49:13 INFO Executor: Finished task ID 5 >> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on >> executor localhost: localhost (PROCESS_LOCAL) >> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes >> in 0 ms >> 14/07/21 19:49:13 INFO Executor: Running task ID 7 >> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) >> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost >> (progress: 6/106) >> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 >> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 >> 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 >> java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 >> at >> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624) >> at >> org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) >> at >> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) >> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >> at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at scala.collection.immutable.$colon$colon.readObject(List.scala:362) >> at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) >> at >> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) >> at >> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) >> at >> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) >> at >> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:744) >> >> >> >> >> -- >> Nan Zhu >> >> On Thursday, September 11, 2014 at 10:42 AM, Nan Zhu wrote: >> >> Hi, >> >> Can you attach more logs to see if there is some entry from >> ContextCleaner? >> >> I met very similar issue before…but haven’t get resolved >> >> Best, >> >> -- >> Nan Zhu >> >> On Thursday, September 11, 2014 at 10:13 AM, Dibyendu Bhattacharya wrote: >> >> Dear All, >> >> Not sure if this is a false alarm. But wanted to raise to this to >> understand what is happening. >> >> I am testing the Kafka Receiver which I have written ( >> https://github.com/dibbhatt/kafka-spark-consumer) which basically a low >> level Kafka Consumer implemented custom Receivers for every Kafka topic >> partitions and pulling data in parallel. Individual streams from all topic >> partitions are then merged to create Union stream which used for further >> processing. >> >> The custom Receiver working fine in normal load with no issues. But when >> I tested this with huge amount of backlog messages from Kafka ( 50 million >> + messages), I see couple of major issue in Spark Streaming. Wanted to get >> some opinion on this.... >> >> I am using latest Spark 1.1 taken from the source and built it. Running >> in Amazon EMR , 3 m1.xlarge Node Spark cluster running in Standalone Mode. >> >> Below are two main question I have.. >> >> 1. What I am seeing when I run the Spark Streaming with my Kafka >> Consumer with a huge backlog in Kafka ( around 50 Million), Spark is >> completely busy performing the Receiving task and hardly schedule any >> processing task. Can you let me if this is expected ? If there is large >> backlog, Spark will take long time pulling them . Why Spark not doing any >> processing ? Is it because of resource limitation ( say all cores are busy >> puling ) or it is by design ? I am setting the executor-memory to 10G and >> driver-memory to 4G . >> >> 2. *This issue seems to be more serious.* I have attached the Driver >> trace with this email. What I can see very frequently Block are selected to >> be Removed...This kind of entries are all over the place. But when a Block >> is removed , below problem happen.... May be this issue cause the issue 1 >> that no Jobs are getting processed .. >> >> >> INFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for >> dropping >> INFO : org.apache.spark.storage.BlockManager - Dropping block >> *input-0-1410443074600* from memory >> INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 >> of size 12651900 dropped from memory (free 21220667) >> INFO : org.apache.spark.storage.BlockManagerInfo - Removed >> input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory >> (size: 12.1 MB, free: 100.6 MB) >> ........... >> >> INFO : org.apache.spark.storage.BlockManagerInfo - Removed >> input-0-1410443074600 on ip-10-252-5-62.asskickery.us:37033 in memory >> (size: 12.1 MB, free: 154.6 MB) >> .............. >> >> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage >> 7.0 (TID 118, ip-10-252-5-62.asskickery.us): java.lang.Exception: Could >> not compute split, block input-0-1410443074600 not found >> >> ........... >> >> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 0.1 in stage >> 7.0 (TID 126) on executor ip-10-252-5-62.asskickery.us: >> java.lang.Exception (Could not compute split, block input-0-1410443074600 >> not found) [duplicate 1] >> >> >> org.apache.spark.SparkException: *Job aborted due to stage failure*: >> Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in >> stage 7.0 (TID 139, ip-10-252-5-62.asskickery.us): java.lang.Exception: >> Could not compute split, block input-0-1410443074600 not found >> org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >> org.apache.spark.rdd.RDD.iterator(RDD.scala:227) >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >> org.apache.spark.scheduler.Task.run(Task.scala:54) >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> java.lang.Thread.run(Thread.java:744) >> >> Regards, >> Dibyendu >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [hidden email] >> <http://user/SendEmail.jtp?type=node&node=14075&i=1> >> For additional commands, e-mail: [hidden email] >> <http://user/SendEmail.jtp?type=node&node=14075&i=2> >> >> Attachments: >> - driver-trace.txt >> >> >> >> > > > ------------------------------ > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/Re-Some-Serious-Issue-with-Spark-Streaming-Blocks-Getting-Removed-and-Jobs-have-Failed-tp13972p14075.html > To start a new topic under Apache Spark User List, email > ml-node+s1001560n1...@n3.nabble.com > To unsubscribe from Apache Spark User List, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=amVvZmZyZXlsQGdtYWlsLmNvbXwxfDUzNTE3MDc2OQ==> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Some-Serious-Issue-with-Spark-Streaming-Blocks-Getting-Removed-and-Jobs-have-Failed-tp13972p14081.html Sent from the Apache Spark User List mailing list archive at Nabble.com.