Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222814573 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) </div> </div> -##### Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +##### Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +###### ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2<Dataset<String>, long> { + void call(Dataset<String> dataset, long batchId) { + // Transform and write batchDF + } + } +).start(); +{% endhighlight %} + +</div> +<div data-lang="python" markdown="1"> + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + +</div> +<div data-lang="r" markdown="1"> +R is not yet supported. +</div> +</div> + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + + streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() + } + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +###### Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)). + +{% highlight scala %} +streamingDF.writeStream.foreach( + new ForeachWriter[String] { + + def open(partitionId: Long, version: Long): Boolean = { + // Open connection + } + + def process(record: String) = { + // Write string to connection + } + + def close(errorOrNull: Throwable): Unit = { + // Close the connection + } + } +).start() +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)). +{% highlight java %} +streamingDF.writeStream.foreach( --- End diff -- `streamingDF.writeStream` -> `streamingDatasetOfString.writeStream()`
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org