Ah, sorry, sorry, my brain just damaged….. sent some wrong information  

not “spark.cores.max” but the minPartitions in sc.textFile()  


Best,

--  
Nan Zhu


On Monday, July 21, 2014 at 7:17 PM, Tathagata Das wrote:

> That is definitely weird. spark.core.max should not affect thing when they 
> are running local mode.  
>  
> And, I am trying to think of scenarios that could cause a broadcast variable 
> used in the current job to fall out of scope, but they all seem very far 
> fetched. So i am really curious to see the code where this could be 
> happening.  
>  
> Either ways, you could turn off the behavior by using 
> spark.cleaner.referenceTracking=false
>  
> TD
>  
>  
> On Mon, Jul 21, 2014 at 3:52 PM, Nan Zhu <zhunanmcg...@gmail.com 
> (mailto:zhunanmcg...@gmail.com)> wrote:
> > Hi, TD,   
> >  
> > I think I got more insights to the problem
> >  
> > in the Jenkins test file, I mistakenly pass a wrong value to 
> > spark.cores.max, which is much larger than the expected value   
> >  
> > (I passed master address as local[6], and spark.core.max as 200)
> >  
> > If I set a more consistent value, everything goes well,  
> >  
> > But I do not think it will bring this problem even the spark.cores.max is 
> > too large?  
> >  
> > Best,  
> >  
> > --  
> > Nan Zhu
> >  
> >  
> > On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote:
> >  
> > > Hi, TD,   
> > >  
> > > Thanks for the reply
> > >  
> > > I tried to reproduce this in a simpler program, but no luck  
> > >  
> > > However, the program has been very simple, just load some files from HDFS 
> > > and write them to HBase….
> > >  
> > > ---
> > >  
> > > It seems that the issue only appears when I run the unit test in Jenkins 
> > > (not fail every time, in usual, it will success in 1/10 times)  
> > >  
> > > I once suspected that it’s related to some concurrency issue, but even I 
> > > disable the parallel test in built.sbt, the problem is still there  
> > >  
> > > ---
> > >  
> > > Best,  
> > >  
> > > --  
> > > Nan Zhu
> > >  
> > >  
> > > On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:
> > >  
> > > > The ContextCleaner cleans up data and metadata related to RDDs and 
> > > > broadcast variables, only when those variables are not in scope and get 
> > > > garbage-collected by the JVM. So if the broadcast variable in question 
> > > > is probably somehow going out of scope even before the job using the 
> > > > broadcast variable is in progress.  
> > > >  
> > > > Could you reproduce this behavior reliably in a simple code snippet 
> > > > that you can share with us?
> > > >  
> > > > TD
> > > >  
> > > >  
> > > >  
> > > > On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu <zhunanmcg...@gmail.com 
> > > > (mailto:zhunanmcg...@gmail.com)> wrote:
> > > > > Hi, all  
> > > > >  
> > > > > When I run some Spark application (actually unit test of the 
> > > > > application in Jenkins ), I found that I always hit the 
> > > > > FileNotFoundException when reading broadcast variable   
> > > > >  
> > > > > The program itself works well, except the unit test
> > > > >  
> > > > > Here is the example log:  
> > > > >  
> > > > >  
> > > > > 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 
> > > > > INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO 
> > > > > TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 
> > > > > 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table 
> > > > > instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: 
> > > > > Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO 
> > > > > Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 
> > > > > INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 
> > > > > INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO 
> > > > > TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: 
> > > > > localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: 
> > > > > Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO 
> > > > > Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: 
> > > > > Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed 
> > > > > ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned 
> > > > > broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 
> > > > > 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO 
> > > > > BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO 
> > > > > BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO 
> > > > > MemoryStore: Block broadcast_0 of size 202564 dropped from memory 
> > > > > (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned 
> > > > > shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all 
> > > > > files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
> > > > > hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 
> > > > > (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21)
> > > > >  19:49:13 INFO HadoopRDD: Input split: 
> > > > > hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 
> > > > > (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21)
> > > > >  19:49:13 INFO TableOutputFormat: Created table instance for 
> > > > > hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size 
> > > > > of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending 
> > > > > result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: 
> > > > > Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting 
> > > > > task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 
> > > > > 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 
> > > > > bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 
> > > > > 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 
> > > > > 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on 
> > > > > localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: 
> > > > > Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO 
> > > > > Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 
> > > > > INFO Executor: Sending result for 5 directly to driver 14/07/21 
> > > > > 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO 
> > > > > TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: 
> > > > > localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: 
> > > > > Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO 
> > > > > Executor: Running task ID 7 14/07/21 19:49:13 INFO DAGScheduler: 
> > > > > Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO TaskSetManager: 
> > > > > Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 
> > > > > 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 
> > > > > 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast 
> > > > > variable 0 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 
> > > > > java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 
> > > > > at sun.net 
> > > > > (http://sun.net).www.protocol.http.HttpURLConnection.getInputStream 
> > > > > (http://www.protocol.http.HttpURLConnection.getInputStream)(HttpURLConnection.java:1624)
> > > > >  at 
> > > > > org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196)
> > > > >  at 
> > > > > org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89)
> > > > >  at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at 
> > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > >  at java.lang.reflect.Method.invoke(Method.java:606) at 
> > > > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
> > > > > at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
> > > > > at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
> > > > > at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
> > > > > scala.collection.immutable.$colon$colon.readObject(List.scala:362) at 
> > > > > sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at 
> > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > >  at java.lang.reflect.Method.invoke(Method.java:606) at 
> > > > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
> > > > > at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
> > > > > at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
> > > > > at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
> > > > > scala.collection.immutable.$colon$colon.readObject(List.scala:362) at 
> > > > > sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at 
> > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > > >  at java.lang.reflect.Method.invoke(Method.java:606) at 
> > > > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
> > > > > at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
> > > > > at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
> > > > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> > > > >  at 
> > > > > org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
> > > > >  at 
> > > > > org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
> > > > >  at 
> > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
> > > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at 
> > > > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> > > > >  at 
> > > > > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
> > > > >  at 
> > > > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) 
> > > > > at 
> > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > > > >  at 
> > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > > > >  at java.lang.Thread.run(Thread.java:744)
> > > > >  
> > > > > I highlighted the lines indicating the ContextCleaner cleaned the 
> > > > > broadcast variable, I’m wondering why the variable is cleaned, since 
> > > > > there are enough memory space?  
> > > > >  
> > > > > Best,  
> > > > >  
> > > > > --  
> > > > > Nan Zhu
> > > > >  
> > > > >  
> > > >  
> > > >  
> > > >  
> > >  
> >  
>  

Reply via email to