This is my case about broadcast variable:  

14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO 
DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: 
Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO 
TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 
19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 
19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 
19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 
INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: 
Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 
14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 
0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO 
BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: 
Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned 
broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on 
localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block 
broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block 
broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 
202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO 
ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: 
Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 19:49:13 
INFO HadoopRDD: Input split: 
hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 19:49:13 
INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 
19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 
19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 
19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO 
TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost 
(PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 
11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 
19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO 
TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 
19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 
14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 
14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 
14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO 
TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost 
(PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 
11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 
19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO 
TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 
19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 
19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 
19:49:13 ERROR Executor: Exception in task ID 6 java.io.FileNotFoundException: 
http://172.31.34.174:52070/broadcast_0 at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream 
(http://www.protocol.http.HttpURLConnection.getInputStream)(HttpURLConnection.java:1624)
 at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) at 
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at 
sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
scala.collection.immutable.$colon$colon.readObject(List.scala:362) at 
sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
scala.collection.immutable.$colon$colon.readObject(List.scala:362) at 
sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) 
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) at 
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:744)



--  
Nan Zhu


On Thursday, September 11, 2014 at 10:42 AM, Nan Zhu wrote:

> Hi,   
>  
> Can you attach more logs to see if there is some entry from ContextCleaner?
>  
> I met very similar issue before…but haven’t get resolved  
>  
> Best,  
>  
> --  
> Nan Zhu
>  
>  
> On Thursday, September 11, 2014 at 10:13 AM, Dibyendu Bhattacharya wrote:
>  
> > Dear All,  
> >  
> > Not sure if this is a false alarm. But wanted to raise to this to 
> > understand what is happening.  
> >  
> > I am testing the Kafka Receiver which I have written 
> > (https://github.com/dibbhatt/kafka-spark-consumer) which basically a low 
> > level Kafka Consumer implemented custom Receivers for every Kafka topic 
> > partitions and pulling data in parallel. Individual streams from all topic 
> > partitions are then merged to create Union stream which used for further 
> > processing.
> >  
> > The custom Receiver working fine in normal load with no issues. But when I 
> > tested this with huge amount of backlog messages from Kafka ( 50 million + 
> > messages), I see couple of major issue in Spark Streaming. Wanted to get 
> > some opinion on this....
> >  
> > I am using latest Spark 1.1 taken from the source and built it. Running in 
> > Amazon EMR , 3 m1.xlarge Node Spark cluster running in Standalone Mode.
> >  
> > Below are two main question I have..
> >  
> > 1. What I am seeing when I run the Spark Streaming with my Kafka Consumer 
> > with a huge backlog in Kafka ( around 50 Million), Spark is completely busy 
> > performing the Receiving task and hardly schedule any processing task. Can 
> > you let me if this is expected ? If there is large backlog, Spark will take 
> > long time pulling them . Why Spark not doing any processing ? Is it because 
> > of resource limitation ( say all cores are busy puling ) or it is by design 
> > ? I am setting the executor-memory to 10G and driver-memory to 4G .
> >  
> > 2. This issue seems to be more serious. I have attached the Driver trace 
> > with this email. What I can see very frequently Block are selected to be 
> > Removed...This kind of entries are all over the place. But when a Block is 
> > removed , below problem happen.... May be this issue cause the issue 1 that 
> > no Jobs are getting processed ..
> >  
> >  
> > INFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping
> > INFO : org.apache.spark.storage.BlockManager - Dropping block 
> > input-0-1410443074600 from memory
> > INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 
> > of size 12651900 dropped from memory (free 21220667)
> > INFO : org.apache.spark.storage.BlockManagerInfo - Removed 
> > input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 
> > (http://ip-10-252-5-113.asskickery.us:53752) in memory (size: 12.1 MB, 
> > free: 100.6 MB)
> >  
> > ...........
> >  
> > INFO : org.apache.spark.storage.BlockManagerInfo - Removed 
> > input-0-1410443074600 on ip-10-252-5-62.asskickery.us:37033 
> > (http://ip-10-252-5-62.asskickery.us:37033) in memory (size: 12.1 MB, free: 
> > 154.6 MB)
> > ..............
> >  
> >  
> > WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 
> > 7.0 (TID 118, ip-10-252-5-62.asskickery.us 
> > (http://ip-10-252-5-62.asskickery.us)): java.lang.Exception: Could not 
> > compute split, block input-0-1410443074600 not found
> >  
> > ...........
> >  
> > INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 0.1 in stage 
> > 7.0 (TID 126) on executor ip-10-252-5-62.asskickery.us 
> > (http://ip-10-252-5-62.asskickery.us): java.lang.Exception (Could not 
> > compute split, block input-0-1410443074600 not found) [duplicate 1]
> >  
> >  
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> > in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
> > 7.0 (TID 139, ip-10-252-5-62.asskickery.us 
> > (http://ip-10-252-5-62.asskickery.us)): java.lang.Exception: Could not 
> > compute split, block input-0-1410443074600 not found
> >         org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >         org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
> >         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >         org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
> >         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> >         org.apache.spark.scheduler.Task.run(Task.scala:54)
> >         
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> >         
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >         
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >         java.lang.Thread.run(Thread.java:744)
> >  
> >  
> > Regards,  
> > Dibyendu
> >  
> >  
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> > (mailto:user-unsubscr...@spark.apache.org)
> > For additional commands, e-mail: user-h...@spark.apache.org 
> > (mailto:user-h...@spark.apache.org)
> >  
> >  
> >  
> >  
> > Attachments:  
> > - driver-trace.txt
> >  
> >  
>  
>  

Reply via email to