[ 
https://issues.apache.org/jira/browse/SPARK-25331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603143#comment-16603143
 ] 

Mihaly Toth commented on SPARK-25331:
-------------------------------------

After looking into how this could be solved there are a few potential ways I 
could think of:
# Make the resulting file names deterministic based on the input. Currently it 
contains a UUID which is by nature different in each run. The question here if 
partitioning of the data can always be done the same way. And what else was the 
motivation for adding a UUID to the name.
# Create a "write ahead manifest file" which contains the generated file names. 
This could be used in the {{ManifestFileCommitProtocol.setupJob}} which is 
currently a noop. We may need to store some additional data like partitioning 
in order to generate the same file contents again.
# Document and mandate the use of the manifest file for the consumer of the 
file output. Currently this file is not mentioned in the docs. Even if this 
would be documented that would make the life of the consumer more difficult not 
to mention that this would be somewhat counter intuitive.

Before rushing into the implementation it would make sense to discuss the 
direction I guess. I would pick the first if that is possible.

> Structured Streaming File Sink duplicates records in case of driver failure
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-25331
>                 URL: https://issues.apache.org/jira/browse/SPARK-25331
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Mihaly Toth
>            Priority: Major
>
> Lets assume {{FileStreamSink.addBtach}} is called and an appropriate job has 
> been started by {{FileFormatWrite.write}} and then the resulting task sets 
> are completed but in the meantime the driver dies. In such a case repeating 
> {{FileStreamSink.addBtach}} will result in duplicate writing of the data
> In the event the driver fails after the executors start processing the job 
> the processed batch will be written twice.
> Steps needed:
> # call {{FileStreamSink.addBtach}}
> # make the {{ManifestFileCommitProtocol}} fail to finish its {{commitJob}}
> # call {{FileStreamSink.addBtach}} with the same data
> # make the {{ManifestFileCommitProtocol}} finish its {{commitJob}} 
> successfully
> # Verify file output - according to {{Sink.addBatch}} documentation the rdd 
> should be written only once
> I have created a wip PR with a unit test:
> https://github.com/apache/spark/pull/22331



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to