This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b37c8d5 [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter b37c8d5 is described below commit b37c8d5cea2e31e7821d848e42277f8fb7b68f30 Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Tue Aug 20 00:56:53 2019 -0700 [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter # What changes were proposed in this pull request? This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for `(partitionId, epochId)`. Refer the description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this). Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with `epochId`, though it will be harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given `epochId` 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch i [...] Credit to zsxwing on discovering the broken guarantee. ## How was this patch tested? This is just a documentation change, both on javadoc and guide doc. Closes #25407 from HeartSaVioR/SPARK-28650. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- docs/structured-streaming-programming-guide.md | 14 ++++++-------- .../main/scala/org/apache/spark/sql/ForeachWriter.scala | 13 +++++-------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index e07a0e5..b0d3e16 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1843,7 +1843,7 @@ Here are the details of all the sinks in Spark. <td><b>Foreach Sink</b></td> <td>Append, Update, Complete</td> <td>None</td> - <td>Depends on ForeachWriter implementation</td> + <td>Yes (at-least-once)</td> <td>More details in the <a href="#using-foreach-and-foreachbatch">next section</a></td> </tr> <tr> @@ -2251,13 +2251,11 @@ When the streaming query is started, Spark calls the function or the object’s - The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle. -- **Note:** The partitionId and epochId in the open() method can be used to deduplicate generated data - when failures cause reprocessing of some input data. This depends on the execution mode of the query. - If the streaming query is being executed in the micro-batch mode, then every partition represented - by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. - Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit - data and achieve exactly-once guarantees. However, if the streaming query is being executed - in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication. +- **Note:** Spark does not guarantee same output for (partitionId, epochId), so deduplication + cannot be achieved with (partitionId, epochId). e.g. source provides different number of + partitions for some reasons, Spark optimization changes number of partitions, etc. + See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. + If you need deduplication on output, try out `foreachBatch` instead. #### Triggers The trigger settings of a streaming query define the timing of streaming data processing, whether diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala index 5c0fe79..a0b0a34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala @@ -50,14 +50,11 @@ import org.apache.spark.annotation.Evolving * * Important points to note: * <ul> - * <li>The `partitionId` and `epochId` can be used to deduplicate generated data when failures - * cause reprocessing of some input data. This depends on the execution mode of the query. If - * the streaming query is being executed in the micro-batch mode, then every partition - * represented by a unique tuple (partitionId, epochId) is guaranteed to have the same data. - * Hence, (partitionId, epochId) can be used to deduplicate and/or transactionally commit data - * and achieve exactly-once guarantees. However, if the streaming query is being executed in the - * continuous mode, then this guarantee does not hold and therefore should not be used for - * deduplication. + * <li>Spark doesn't guarantee same output for (partitionId, epochId), so deduplication + * cannot be achieved with (partitionId, epochId). e.g. source provides different number of + * partitions for some reason, Spark optimization changes number of partitions, etc. + * Refer SPARK-28650 for more details. If you need deduplication on output, try out + * `foreachBatch` instead. * * <li>The `close()` method will be called if `open()` method returns successfully (irrespective * of the return value), except if the JVM crashes in the middle. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org