The ContextCleaner cleans up data and metadata related to RDDs and
broadcast variables, only when those variables are not in scope and get
garbage-collected by the JVM. So if the broadcast variable in question is
probably somehow going out of scope even before the job using the broadcast
variable is in progress.

Could you reproduce this behavior reliably in a simple code snippet that
you can share with us?

TD



On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote:

>  Hi, all
>
> When I run some Spark application (actually unit test of the application in
> Jenkins ), I found that I always hit the FileNotFoundException when
> reading broadcast variable
>
> The program itself works well, except the unit test
>
> Here is the example log:
>
>
> 14/07/21 19:49:13 INFO Executor: Running task ID 4
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost 
> (progress: 3/106)
> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
> hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
> 14/07/21 19:49:13 INFO Executor: Finished task ID 3
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes 
> in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 5
> 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 
> 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost 
> (progress: 4/106)
> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
> 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 
> 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from 
> memory (free 886623436)*
> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
> 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
> 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
> 14/07/21 
> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21> 
> 19:49:13 INFO HadoopRDD: Input split: 
> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
> 14/07/21 
> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21> 
> 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
> 14/07/21 19:49:13 INFO Executor: Finished task ID 4
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on 
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes 
> in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 6
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost 
> (progress: 5/106)
> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
> hdfstest_customers
> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596
> 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver
> 14/07/21 19:49:13 INFO Executor: Finished task ID 5
> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on 
> executor localhost: localhost (PROCESS_LOCAL)
> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes 
> in 0 ms
> 14/07/21 19:49:13 INFO Executor: Running task ID 7
> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5)
> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost 
> (progress: 6/106)
> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
> 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6
> java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0
>       at 
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624)
>       at 
> org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196)
>       at 
> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89)
>       at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>       at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>       at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>       at 
> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
>       at 
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
>       at 
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
>
>
> I highlighted the lines indicating the ContextCleaner cleaned the broadcast 
> variable, I’m wondering why the variable is cleaned, since there are enough 
> memory space?
>
>
> Best,
>
>
> --
> Nan Zhu
>

Reply via email to