Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164909225
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
 ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of 
commit messages. The commit
    -   * messages are collected from successful data writers and are produced 
by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is 
considered to have been
    -   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data 
partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the 
destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should 
ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for 
the same epoch
        * in some circumstances.
        */
    -  void commit(long epochId, WriterCommitMessage[] messages);
    +  void commit(long epochId);
     
       /**
    -   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
    -   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
    +   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
    +   * or the Spark job fails with some unknown reasons,
    +   * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
        *
        * If this method fails (by throwing an exception), the underlying data 
source may require manual
        * cleanup.
        *
    -   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
    -   * null slots as there maybe only a few data writers that are committed 
before the abort
    -   * happens, or some data writers were committed but their commit 
messages haven't reached the
    -   * driver when the abort is triggered. So this is just a "best effort" 
for data sources to
    -   * clean up the data left by data writers.
    +   * Unless the abort is triggered by the failure of commit, the number of 
commit
    +   * messages added by {@link #add(WriterCommitMessage)} should be smaller 
than the number
    +   * of input data partitions, as there may be only a few data writers 
that are committed
    +   * before the abort happens, or some data writers were committed but 
their commit messages
    +   * haven't reached the driver when the abort is triggered. So this is 
just a "best effort"
    --- End diff --
    
    Commit messages in flight should be handled and aborted. Otherwise, this 
isn't a "best effort". Best effort means that Spark does everything that is 
feasible to ensure that commit messages are added before aborting, and that 
should include race conditions from RPC.
    
    The case where "best effort" might miss a message is if the message is 
created, but a node fails before it is sent to the driver.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to