Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20386#discussion_r164918470 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java --- @@ -32,40 +32,44 @@ @InterfaceStability.Evolving public interface StreamWriter extends DataSourceWriter { /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. + * Commits this writing job for the specified epoch. * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * When this method is called, the number of commit messages added by + * {@link #add(WriterCommitMessage)} equals to the number of input data partitions. + * + * If this method fails (by throwing an exception), this writing job is considered to to have been + * failed, and {@link #abort()} would be called. The state of the destination + * is undefined and @{@link #abort()} may not be able to deal with it. * * To support exactly-once processing, writer implementations should ensure that this method is * idempotent. The execution engine may call commit() multiple times for the same epoch --- End diff -- The StreamWriter is responsible for setting up a distributed transaction to commit the data within batch both locally and to the remote system. But the StreamExecution keeps its own log of which batches have been fully completed. ("Fully completed" includes things like stateful aggregation commit and progress logging which can't reasonably participate in the StreamWriter's transaction.) So there's a scenario where Spark fails between StreamWriter commit and StreamExecution commit, in which the StreamExecution must re-execute the batch to ensure everything is in the right state. The StreamWriter is responsible for ensuring this doesn't generate duplicate data in the remote system. Note that the "true" exactly once strategy, where the StreamWriter aborts the retried batch because it was already committed before, is indeed idempotent wrt StreamWriter.commit(epochId). But there are weaker strategies which still provide equivalent semantics.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org