[ 
https://issues.apache.org/jira/browse/SPARK-25331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608933#comment-16608933
 ] 

Mihaly Toth commented on SPARK-25331:
-------------------------------------

I was thinking about how to make FileStreamSink idempotent even in failure 
cases using the first approach (deterministic file names).

As a starting point we need to put the partition id (and bucket id if it 
exists) into the file name and remove the UUID from it. If the same batch is 
rewritten and the file already exists we can simply skip writing it again 
assuming the same partition of the same batch will generate the same data 
again. There are a few special cases though:

If the file is half written when the writing executor stops there will be 
missing records from the end of the file. We can eliminate this by first 
writing the data into a temp file and then moving it to its intended location.

The other problematic are is around {{maxRecordsPerFile}} limit. This way the 
same batch+partition pair may generate multiple files. It may happen that some 
of these files were created but some of them are missing. Generating those that 
are missing is only working if the ordering of the items is exactly the same in 
each run. This may or may not be true. If the order differs between the two 
runs and we simply skip generating those that already exists there may be 
missing or duplicated items in the resulting files.

We could subtract the records in the already existing files from the input RDD. 
I feel that would make the writing logic quite complex, and that would imply 
unexpected computational load onto the executor. But that would be working in 
all cases.

Another solution for partially generated file sets would be to start reading 
and generating the file at the same time and compare the records one by one. If 
the already existing file is the same as the file to be generated we can skip 
creating the file. If it is different we can create a file with some mark in 
its name like "-v2". With this the receiver can achieve exactly once semantics 
in the following ways:
 # Do not limit the maximum records per file
 # Limit the number of records but apply strict ordering on the resulting rdd
 # Limit the number of records without applying strict ordering but compensate 
those files that have newer versions appearing in the output directory

[~rxin], [~Gengliang.Wang] what is your opinion on this?

> Structured Streaming File Sink duplicates records in case of driver failure
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-25331
>                 URL: https://issues.apache.org/jira/browse/SPARK-25331
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: Mihaly Toth
>            Priority: Major
>
> Lets assume {{FileStreamSink.addBtach}} is called and an appropriate job has 
> been started by {{FileFormatWrite.write}} and then the resulting task sets 
> are completed but in the meantime the driver dies. In such a case repeating 
> {{FileStreamSink.addBtach}} will result in duplicate writing of the data
> In the event the driver fails after the executors start processing the job 
> the processed batch will be written twice.
> Steps needed:
> # call {{FileStreamSink.addBtach}}
> # make the {{ManifestFileCommitProtocol}} fail to finish its {{commitJob}}
> # call {{FileStreamSink.addBtach}} with the same data
> # make the {{ManifestFileCommitProtocol}} finish its {{commitJob}} 
> successfully
> # Verify file output - according to {{Sink.addBatch}} documentation the rdd 
> should be written only once
> I have created a wip PR with a unit test:
> https://github.com/apache/spark/pull/22331



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to