Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164737290
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
    @@ -63,32 +68,33 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
    -   * successful data writers and are produced by {@link 
DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data 
writer in the executor side.
    +   *
    +   * Note that, implementations might need to cache all commit messages 
before calling
    +   * {@link #commit()} or {@link #abort()}.
        *
        * If this method fails (by throwing an exception), this writing job is 
considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the 
destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    +   */
    +  void add(WriterCommitMessage message);
    +
    +  /**
    +   * Commits this writing job.
        *
    -   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. 
Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
    -   * writer can commit, or have a way to clean up the data of 
already-committed writers.
    +   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the 
destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        */
    -  void commit(WriterCommitMessage[] messages);
    +  void commit();
     
       /**
    -   * Aborts this writing job because some data writers are failed and keep 
failing when retry, or
    -   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
    +   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
    +   * or the Spark job fails with some unknown reasons,
    +   * or {@link #commit()} /{@link #add(WriterCommitMessage)} fails.
        *
        * If this method fails (by throwing an exception), the underlying data 
source may require manual
        * cleanup.
    -   *
    -   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
    --- End diff --
    
    Unless the abort is triggered by the failure of #commit, the number of 
commit messages added by #add should be smaller than the number of input data 
partitions, as there may be only a few data writers that are committed before 
the abort happens, or some data writers were committed but their commit 
messages haven't reached the driver when the abort is triggered. So this is 
just a "best effort" for data sources to clean up the data left by data writers.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to