Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164918470
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
    -   * messages are collected from successful data writers and are produced 
by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is 
considered to have been
    -   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the 
destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should 
ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for 
the same epoch
    --- End diff --
    
    The StreamWriter is responsible for setting up a distributed transaction to 
commit the data within batch both locally and to the remote system. But the 
StreamExecution keeps its own log of which batches have been fully completed. 
("Fully completed" includes things like stateful aggregation commit and 
progress logging which can't reasonably participate in the StreamWriter's 
transaction.)
    
    So there's a scenario where Spark fails between StreamWriter commit and 
StreamExecution commit, in which the StreamExecution must re-execute the batch 
to ensure everything is in the right state. The StreamWriter is responsible for 
ensuring this doesn't generate duplicate data in the remote system.
    
    Note that the "true" exactly once strategy, where the StreamWriter aborts 
the retried batch because it was already committed before, is indeed idempotent 
wrt StreamWriter.commit(epochId). But there are weaker strategies which still 
provide equivalent semantics.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to