[ 
https://issues.apache.org/jira/browse/SPARK-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831098#comment-15831098
 ] 

Nan Zhu commented on SPARK-19233:
---------------------------------

By filtering generatedRDDs, I may bring some confusion here, what I propose is 

1. Add a latestCheckpointTime in DStreamCheckpointData

2. when update() is called, only put RDDs with the timestamp <= 
latestCheckpointTime to currentCheckpointFiles 
(https://github.com/CodingCat/spark/blob/65623f4408ab6152719046c55093c70435da82c8/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53)

What do you think?


Regarding correctness, until so far, I haven't seen applications die because of 
this issue. But when I analyzed log with some simple script/re to see whether 
(some lines of) DStream.compute() is called, I was always confused...since I 
assume checkpoint-1000 will always call compute(time > 1000)  

> Inconsistent Behaviour of Spark Streaming Checkpoint
> ----------------------------------------------------
>
>                 Key: SPARK-19233
>                 URL: https://issues.apache.org/jira/browse/SPARK-19233
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>            Reporter: Nan Zhu
>
> When checking one of our application logs, we found the following behavior 
> (simplified)
> 1. Spark application recovers from checkpoint constructed at timestamp 1000ms
> 2. The log shows that Spark application can recover RDDs generated at 
> timestamp 2000, 3000
> The root cause is that generateJobs event is pushed to the queue by a 
> separate thread (RecurTimer), before doCheckpoint event is pushed to the 
> queue, there might have been multiple generatedJobs being processed. As a 
> result, when doCheckpoint for timestamp 1000 is processed, the generatedRDDs 
> data structure containing RDDs generated at 2000, 3000 is serialized as part 
> of checkpoint of 1000.
> It brings overhead for debugging and coordinate our offset management 
> strategy with Spark Streaming's checkpoint strategy when we are developing a 
> new type of DStream which integrates Spark Streaming with an internal message 
> middleware.
> The proposed fix is to filter generatedRDDs according to checkpoint timestamp 
> when serializing it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to