Dear all, I am sorry. This was a false alarm
There was some issue in the RDD processing logic which leads to large backlog. Once I fixed the issues in my processing logic, I can see all messages being pulled nicely without any Block Removed error. I need to tune certain configurations in my Kafka Consumer to modify the data rate and also the batch size. Sorry again. Regards, Dibyendu On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote: > This is my case about broadcast variable: > > 14/07/21 19:49:13 INFO Executor: Running task ID 4 > 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) > 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost > (progress: 3/106) > 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for > hdfstest_customers > 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 > 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver > 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally > 14/07/21 19:49:13 INFO Executor: Finished task ID 3 > 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on > executor localhost: localhost (PROCESS_LOCAL) > 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes > in 0 ms > 14/07/21 19:49:13 INFO Executor: Running task ID 5 > 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 > 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 > 19:49:13 INFO ContextCleaner: Cleaned broadcast 0* > 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost > (progress: 4/106) > 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally > 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 > 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from > memory (free 886623436)* > 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 > 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 > 14/07/21 19:49:13 INFO HadoopRDD: Input split: > hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 > 14/07/21 > <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21> > 19:49:13 INFO HadoopRDD: Input split: > hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 > 14/07/21 > <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21> > 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers > 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 > 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver > 14/07/21 19:49:13 INFO Executor: Finished task ID 4 > 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on > executor localhost: localhost (PROCESS_LOCAL) > 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes > in 0 ms > 14/07/21 19:49:13 INFO Executor: Running task ID 6 > 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) > 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost > (progress: 5/106) > 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for > hdfstest_customers > 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 > 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver > 14/07/21 19:49:13 INFO Executor: Finished task ID 5 > 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on > executor localhost: localhost (PROCESS_LOCAL) > 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes > in 0 ms > 14/07/21 19:49:13 INFO Executor: Running task ID 7 > 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) > 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost > (progress: 6/106) > 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 > 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 > 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 > java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 > at > sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624) > at > org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) > at > org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at scala.collection.immutable.$colon$colon.readObject(List.scala:362) > at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at scala.collection.immutable.$colon$colon.readObject(List.scala:362) > at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) > at > org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) > at > org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) > at > java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > > > > > -- > Nan Zhu > > On Thursday, September 11, 2014 at 10:42 AM, Nan Zhu wrote: > > Hi, > > Can you attach more logs to see if there is some entry from ContextCleaner? > > I met very similar issue before…but haven’t get resolved > > Best, > > -- > Nan Zhu > > On Thursday, September 11, 2014 at 10:13 AM, Dibyendu Bhattacharya wrote: > > Dear All, > > Not sure if this is a false alarm. But wanted to raise to this to > understand what is happening. > > I am testing the Kafka Receiver which I have written ( > https://github.com/dibbhatt/kafka-spark-consumer) which basically a low > level Kafka Consumer implemented custom Receivers for every Kafka topic > partitions and pulling data in parallel. Individual streams from all topic > partitions are then merged to create Union stream which used for further > processing. > > The custom Receiver working fine in normal load with no issues. But when I > tested this with huge amount of backlog messages from Kafka ( 50 million + > messages), I see couple of major issue in Spark Streaming. Wanted to get > some opinion on this.... > > I am using latest Spark 1.1 taken from the source and built it. Running in > Amazon EMR , 3 m1.xlarge Node Spark cluster running in Standalone Mode. > > Below are two main question I have.. > > 1. What I am seeing when I run the Spark Streaming with my Kafka Consumer > with a huge backlog in Kafka ( around 50 Million), Spark is completely busy > performing the Receiving task and hardly schedule any processing task. Can > you let me if this is expected ? If there is large backlog, Spark will take > long time pulling them . Why Spark not doing any processing ? Is it because > of resource limitation ( say all cores are busy puling ) or it is by design > ? I am setting the executor-memory to 10G and driver-memory to 4G . > > 2. *This issue seems to be more serious.* I have attached the Driver > trace with this email. What I can see very frequently Block are selected to > be Removed...This kind of entries are all over the place. But when a Block > is removed , below problem happen.... May be this issue cause the issue 1 > that no Jobs are getting processed .. > > > INFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for > dropping > INFO : org.apache.spark.storage.BlockManager - Dropping block > *input-0-1410443074600* from memory > INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of > size 12651900 dropped from memory (free 21220667) > INFO : org.apache.spark.storage.BlockManagerInfo - Removed > input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory > (size: 12.1 MB, free: 100.6 MB) > ........... > > INFO : org.apache.spark.storage.BlockManagerInfo - Removed > input-0-1410443074600 on ip-10-252-5-62.asskickery.us:37033 in memory > (size: 12.1 MB, free: 154.6 MB) > .............. > > WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage > 7.0 (TID 118, ip-10-252-5-62.asskickery.us): java.lang.Exception: Could > not compute split, block input-0-1410443074600 not found > > ........... > > INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 0.1 in stage > 7.0 (TID 126) on executor ip-10-252-5-62.asskickery.us: > java.lang.Exception (Could not compute split, block input-0-1410443074600 > not found) [duplicate 1] > > > org.apache.spark.SparkException: *Job aborted due to stage failure*: Task > 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 7.0 (TID 139, ip-10-252-5-62.asskickery.us): java.lang.Exception: Could > not compute split, block input-0-1410443074600 not found > org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > org.apache.spark.rdd.RDD.iterator(RDD.scala:227) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:744) > > Regards, > Dibyendu > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > Attachments: > - driver-trace.txt > > > >