[ 
https://issues.apache.org/jira/browse/SPARK-27086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16796812#comment-16796812
 ] 

Jungtaek Lim commented on SPARK-27086:
--------------------------------------

[~sebastianherold]

This sounds like a kind of request or ask: please use user/dev mailing list for 
that instead.

Btw, MicroBatchReader.commit is not designed to replace checkpoint. It just 
helps for data source to clean up processed offsets. Please refer below:

[https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java#L70-L74]

The batch can be replayed in case of any chance of failures unless the next 
batch determines the new range of offsets and writes to offset log in 
checkpoint. That's why MicroBatchReader.commit is called later than offset log 
being written.

> DataSourceV2 MicroBatchExecution commits last batch only if new batch is 
> constructed
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-27086
>                 URL: https://issues.apache.org/jira/browse/SPARK-27086
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Sebastian Herold
>            Priority: Major
>              Labels: MicroBatchExecution, MicroBatchReader, Spark, Streaming, 
> Structured, commit
>
> I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming 
> data source which offers the new {{commit}} method of the 
> {{MicroBatchReader}} to finally commit the message at SQS after it has been 
> processed. If the processing of messages would fail and they got not 
> committed, after a timeout the message would automatically reappear in SQS 
> which is the intended behaviour without using special state storing or 
> checkpointing.
> Sadly, I noticed that an offset in the {{MicroBatchReader}} got only 
> committed if a new batch is constructed ([see line 400 in 
> {{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400])
>  which is quite strange. Especially, in my SQS example it could happen that 
> after a first batch of messages this there is a long break before new 
> messages are send to SQS. This would lead to a timeout and reappearance of 
> the SQS messages from the previous batch, because they got processed, but not 
> committed. Therefore, I would strongly recommend to commit an offset, once 
> the batch has been processed! The committing should be independent from the 
> next batch!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to