I believe the message merely means that a block has been removed from
memory because either it is not needed or because it is also persisted on
disk and memory is low. It does not mean data is lost. What is the end
problem you observe? This does not match the problem you link to in the
mailing list post.
On Sep 12, 2014 1:17 PM, "Jeoffrey Lim" <jeoffr...@gmail.com> wrote:

> Our issue could be related to this problem as described in:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-1-hour-batch-duration-RDD-files-gets-lost-td14027.html
>  which
> the DStream is processed for every 1 hour batch duration.
>
> I have implemented IO throttling in the Receiver as well in our Kafka
> consumer, and our backlog is not that large.
>
> NFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for dropping
> INFO : org.apache.spark.storage.BlockManager - Dropping block
> *input-0-1410443074600* from memory
> INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600 of
> size 12651900 dropped from memory (free 21220667)
> INFO : org.apache.spark.storage.BlockManagerInfo - Removed
> input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory
> (size: 12.1 MB, free: 100.6 MB)
>
> The question that I have now is: how to prevent the
> MemoryStore/BlockManager of dropping the block inputs? And should they be
> logged in the level WARN/ERROR?
>
>
> Thanks.
>
>
> On Fri, Sep 12, 2014 at 4:45 PM, Dibyendu Bhattacharya [via Apache Spark
> User List] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=14081&i=0>> wrote:
>
>> Dear all,
>>
>> I am sorry. This was a false alarm
>>
>> There was some issue in the RDD processing logic which leads to large
>> backlog. Once I fixed the issues in my processing logic, I can see all
>> messages being pulled nicely without any Block Removed error. I need to
>> tune certain configurations in my Kafka Consumer to modify the data rate
>> and also the batch size.
>>
>> Sorry again.
>>
>>
>> Regards,
>> Dibyendu
>>
>> On Thu, Sep 11, 2014 at 8:13 PM, Nan Zhu <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=14075&i=0>> wrote:
>>
>>>  This is my case about broadcast variable:
>>>
>>> 14/07/21 19:49:13 INFO Executor: Running task ID 4
>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost 
>>> (progress: 3/106)
>>> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
>>> hdfstest_customers
>>> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
>>> 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
>>> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
>>> 14/07/21 19:49:13 INFO Executor: Finished task ID 3
>>> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
>>> executor localhost: localhost (PROCESS_LOCAL)
>>> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes 
>>> in 0 ms
>>> 14/07/21 19:49:13 INFO Executor: Running task ID 5
>>> 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 
>>> 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost 
>>> (progress: 4/106)
>>> 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
>>> 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 
>>> 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from 
>>> memory (free 886623436)*
>>> 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
>>> 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
>>> 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
>>> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
>>> 14/07/21 
>>> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21> 
>>> 19:49:13 INFO HadoopRDD: Input split: 
>>> hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
>>> 14/07/21 
>>> <http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21> 
>>> 19:49:13 INFO TableOutputFormat: Created table instance for 
>>> hdfstest_customers
>>> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
>>> 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
>>> 14/07/21 19:49:13 INFO Executor: Finished task ID 4
>>> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on 
>>> executor localhost: localhost (PROCESS_LOCAL)
>>> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes 
>>> in 0 ms
>>> 14/07/21 19:49:13 INFO Executor: Running task ID 6
>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4)
>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost 
>>> (progress: 5/106)
>>> 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
>>> hdfstest_customers
>>> 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596
>>> 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver
>>> 14/07/21 19:49:13 INFO Executor: Finished task ID 5
>>> 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on 
>>> executor localhost: localhost (PROCESS_LOCAL)
>>> 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes 
>>> in 0 ms
>>> 14/07/21 19:49:13 INFO Executor: Running task ID 7
>>> 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5)
>>> 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost 
>>> (progress: 6/106)
>>> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
>>> 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0
>>> 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6
>>> java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0
>>>     at 
>>> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624)
>>>     at 
>>> org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196)
>>>     at 
>>> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89)
>>>     at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>>>     at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at 
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>     at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>     at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>>>     at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at 
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>     at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>     at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
>>>     at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>>     at 
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>     at 
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>>>     at 
>>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
>>>     at 
>>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
>>>     at 
>>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>>>     at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>     at 
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>>>     at 
>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:744)
>>>
>>>
>>>
>>>
>>> --
>>> Nan Zhu
>>>
>>> On Thursday, September 11, 2014 at 10:42 AM, Nan Zhu wrote:
>>>
>>>  Hi,
>>>
>>> Can you attach more logs to see if there is some entry from
>>> ContextCleaner?
>>>
>>> I met very similar issue before…but haven’t get resolved
>>>
>>> Best,
>>>
>>> --
>>> Nan Zhu
>>>
>>> On Thursday, September 11, 2014 at 10:13 AM, Dibyendu Bhattacharya wrote:
>>>
>>> Dear All,
>>>
>>> Not sure if this is a false alarm. But wanted to raise to this to
>>> understand what is happening.
>>>
>>> I am testing the Kafka Receiver which I have written (
>>> https://github.com/dibbhatt/kafka-spark-consumer) which basically a low
>>> level Kafka Consumer implemented custom Receivers for every Kafka topic
>>> partitions and pulling data in parallel. Individual streams from all topic
>>> partitions are then merged to create Union stream which used for further
>>> processing.
>>>
>>> The custom Receiver working fine in normal load with no issues. But when
>>> I tested this with huge amount of backlog messages from Kafka ( 50 million
>>> + messages), I see couple of major issue in Spark Streaming. Wanted to get
>>> some opinion on this....
>>>
>>> I am using latest Spark 1.1 taken from the source and built it. Running
>>> in Amazon EMR , 3 m1.xlarge Node Spark cluster running in Standalone Mode.
>>>
>>> Below are two main question I have..
>>>
>>> 1. What I am seeing when I run the Spark Streaming with my Kafka
>>> Consumer with a huge backlog in Kafka ( around 50 Million), Spark is
>>> completely busy performing the Receiving task and hardly schedule any
>>> processing task. Can you let me if this is expected ? If there is large
>>> backlog, Spark will take long time pulling them . Why Spark not doing any
>>> processing ? Is it because of resource limitation ( say all cores are busy
>>> puling ) or it is by design ? I am setting the executor-memory to 10G and
>>> driver-memory to 4G .
>>>
>>> 2. *This issue seems to be more serious.* I have attached the Driver
>>> trace with this email. What I can see very frequently Block are selected to
>>> be Removed...This kind of entries are all over the place. But when a Block
>>> is removed , below problem happen.... May be this issue cause the issue 1
>>> that no Jobs are getting processed ..
>>>
>>>
>>> INFO : org.apache.spark.storage.MemoryStore - 1 blocks selected for
>>> dropping
>>> INFO : org.apache.spark.storage.BlockManager - Dropping block
>>> *input-0-1410443074600* from memory
>>> INFO : org.apache.spark.storage.MemoryStore - Block input-0-1410443074600
>>> of size 12651900 dropped from memory (free 21220667)
>>> INFO : org.apache.spark.storage.BlockManagerInfo - Removed
>>> input-0-1410443074600 on ip-10-252-5-113.asskickery.us:53752 in memory
>>> (size: 12.1 MB, free: 100.6 MB)
>>> ...........
>>>
>>> INFO : org.apache.spark.storage.BlockManagerInfo - Removed
>>> input-0-1410443074600 on ip-10-252-5-62.asskickery.us:37033 in memory
>>> (size: 12.1 MB, free: 154.6 MB)
>>> ..............
>>>
>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in
>>> stage 7.0 (TID 118, ip-10-252-5-62.asskickery.us): java.lang.Exception:
>>> Could not compute split, block input-0-1410443074600 not found
>>>
>>> ...........
>>>
>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 0.1 in
>>> stage 7.0 (TID 126) on executor ip-10-252-5-62.asskickery.us:
>>> java.lang.Exception (Could not compute split, block
>>> input-0-1410443074600 not found) [duplicate 1]
>>>
>>>
>>> org.apache.spark.SparkException: *Job aborted due to stage failure*:
>>> Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in
>>> stage 7.0 (TID 139, ip-10-252-5-62.asskickery.us): java.lang.Exception:
>>> Could not compute split, block input-0-1410443074600 not found
>>>         org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>>>         org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>>>         org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>>         org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
>>>
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         java.lang.Thread.run(Thread.java:744)
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: [hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=14075&i=1>
>>> For additional commands, e-mail: [hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=14075&i=2>
>>>
>>> Attachments:
>>>  - driver-trace.txt
>>>
>>>
>>>
>>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Some-Serious-Issue-with-Spark-Streaming-Blocks-Getting-Removed-and-Jobs-have-Failed-tp13972p14075.html
>>  To start a new topic under Apache Spark User List, email [hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=14081&i=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Some Serious Issue with Spark Streaming
> ? Blocks Getting Removed and Jobs have Failed..
> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-Some-Serious-Issue-with-Spark-Streaming-Blocks-Getting-Removed-and-Jobs-have-Failed-tp13972p14081.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>

Reply via email to