Repository: spark
Updated Branches:
  refs/heads/branch-2.4 193ce77fc -> 4baa4d42a


[SPARK-25639][DOCS] Added docs for foreachBatch, python foreach and multiple 
watermarks

## What changes were proposed in this pull request?

Added
- Python foreach
- Scala, Java and Python foreachBatch
- Multiple watermark policy
- The semantics of what changes are allowed to the streaming between restarts.

## How was this patch tested?
No tests

Closes #22627 from tdas/SPARK-25639.

Authored-by: Tathagata Das <tathagata.das1...@gmail.com>
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>
(cherry picked from commit f9935a3f85f46deef2cb7b213c1c02c8ff627a8c)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4baa4d42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4baa4d42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4baa4d42

Branch: refs/heads/branch-2.4
Commit: 4baa4d42acf2400537462bddff811a7644bb49a3
Parents: 193ce77
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon Oct 8 14:32:04 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Oct 8 14:32:18 2018 -0700

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md | 323 +++++++++++++++++++-
 1 file changed, 312 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4baa4d42/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 73de189..b6e4277 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1560,6 +1560,35 @@ streamingDf <- dropDuplicates(streamingDf, "guid", 
"eventTime")
 </div>
 </div>
 
+### Policy for handling multiple watermarks
+A streaming query can have multiple input streams that are unioned or joined 
together.
+Each of the input streams can have a different threshold of late data that 
needs to
+be tolerated for stateful operations. You specify these thresholds using
+``withWatermarks("eventTime", delay)`` on each of the input streams. For 
example, consider
+a query with stream-stream joins between `inputStream1` and `inputStream2`.
+    
+  inputStream1.withWatermark("eventTime1", "1 hour")
+    .join(
+      inputStream2.withWatermark("eventTime2", "2 hours"),
+      joinCondition)
+
+While executing the query, Structured Streaming individually tracks the maximum
+event time seen in each input stream, calculates watermarks based on the 
corresponding delay,
+and chooses a single global watermark with them to be used for stateful 
operations. By default,
+the minimum is chosen as the global watermark because it ensures that no data 
is
+accidentally dropped as too late if one of the streams falls behind the others
+(for example, one of the streams stop receiving data due to upstream 
failures). In other words,
+the global watermark will safely move at the pace of the slowest stream and 
the query output will
+be delayed accordingly.
+
+However, in some cases, you may want to get faster results even if it means 
dropping data from the
+slowest stream. Since Spark 2.4, you can set the multiple watermark policy to 
choose
+the maximum value as the global watermark by setting the SQL configuration
+``spark.sql.streaming.multipleWatermarkPolicy`` to ``max`` (default is 
``min``). 
+This lets the global watermark move at the pace of the fastest stream.
+However, as a side effect, data from the slower streams will be aggressively 
dropped. Hence, use
+this configuration judiciously.
+
 ### Arbitrary Stateful Operations
 Many usecases require more advanced stateful operations than aggregations. For 
example, in many usecases, you have to track sessions from data streams of 
events. For doing such sessionization, you will have to save arbitrary types of 
data as state, and perform arbitrary operations on the state using the data 
stream events in every trigger. Since Spark 2.2, this can be done using the 
operation `mapGroupsWithState` and the more powerful operation 
`flatMapGroupsWithState`. Both operations allow you to apply user-defined code 
on grouped Datasets to update user-defined state. For more concrete details, 
take a look at the API documentation 
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html))
 and the examples 
([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/b
 
lob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)).
 
@@ -1799,9 +1828,17 @@ Here are the details of all the sinks in Spark.
     <td>Append, Update, Complete</td>
     <td>None</td>
     <td>Depends on ForeachWriter implementation</td>
-    <td>More details in the <a href="#using-foreach">next section</a></td>
+    <td>More details in the <a href="#using-foreach-and-foreachbatch">next 
section</a></td>
   </tr>
   <tr>
+      <td><b>ForeachBatch Sink</b></td>
+      <td>Append, Update, Complete</td>
+      <td>None</td>
+      <td>Depends on the implementation</td>
+      <td>More details in the <a href="#using-foreach-and-foreachbatch">next 
section</a></td>
+    </tr>
+    
+  <tr>
     <td><b>Console Sink</b></td>
     <td>Append, Update, Complete</td>
     <td>
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 </div>
 </div>
 
-##### Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`
-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+##### Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+###### ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 2.4, 
this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data of a 
micro-batch and the unique ID of the micro-batch.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2<Dataset<String>, Long> {
+    public void call(Dataset<String> dataset, Long batchId) {
+      // Transform and write batchDF
+    }    
+  }
+).start();
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+def foreach_batch_function(df, epoch_id):
+    # Transform and write batchDF
+    pass
+  
+streamingDF.writeStream.foreachBatch(foreach_batch_function).start()   
+{% endhighlight %}
+
+</div>
+<div data-lang="r"  markdown="1">
+R is not yet supported.
+</div>
+</div>
+
+With `foreachBatch`, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there may 
not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
`foreachBatch`, you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of the 
input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+      batchDF.persist()
+      batchDF.write.format(...).save(...)  // location 1
+      batchDF.write.format(...).save(...)  // location 2
+      batchDF.unpersist()
+    }
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using `foreachBatch`, you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and get an 
exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+###### Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into three 
methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+In Scala, you have to extend the class `ForeachWriter` 
([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
+
+{% highlight scala %}
+streamingDatasetOfString.writeStream.foreach(
+  new ForeachWriter[String] {
+
+    def open(partitionId: Long, version: Long): Boolean = {
+      // Open connection
+    }
+
+    def process(record: String): Unit = {
+      // Write string to connection
+    }
+
+    def close(errorOrNull: Throwable): Unit = {
+      // Close the connection
+    }
+  }
+).start()
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+In Java, you have to extend the class `ForeachWriter` 
([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreach(
+  new ForeachWriter[String] {
+
+    @Override public boolean open(long partitionId, long version) {
+      // Open connection
+    }
+
+    @Override public void process(String record) {
+      // Write string to connection
+    }
+
+    @Override public void close(Throwable errorOrNull) {
+      // Close the connection
+    }
+  }
+).start();
+
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+
+In Python, you can invoke foreach in two ways: in a function or in an object. 
+The function offers a simple way to express your processing logic but does not 
allow you to 
+deduplicate generated data when failures cause reprocessing of some input 
data. 
+For that situation you must specify the processing logic in an object.
+
+1. The function takes a row as input.
+
+  {% highlight python %}
+      def process_row(row):
+          # Write row to storage
+          pass
+      
+      query = streamingDF.writeStream.foreach(process_row).start()  
+  {% endhighlight %}
+
+2. The object has a process method and optional open and close methods: 
+
+  {% highlight python %}
+      class ForeachWriter:
+          def open(self, partition_id, epoch_id):
+              # Open connection. This method is optional in Python.
+              pass
+      
+          def process(self, row):
+              # Write row to connection. This method is NOT optional in Python.
+              pass
+      
+          def close(self, error):
+              # Close the connection. This method in optional in Python.
+              pass
+      
+      query = streamingDF.writeStream.foreach(ForeachWriter()).start()
+  {% endhighlight %}
+
+</div>
+<div data-lang="r"  markdown="1">
+R is not yet supported.
+</div>
+</div>
+
+
+**Execution semantics**
+When the streaming query is started, Spark calls the function or the 
object’s methods in the following way:
+
+- A single copy of this object is responsible for all the data generated by a 
single task in a query. 
+  In other words, one instance is responsible for processing one partition of 
the data generated in a distributed manner.
+
+- This object must be serializable, because each task will get a fresh 
serialized-deserialized copy 
+  of the provided object. Hence, it is strongly recommended that any 
initialization for writing data 
+  (for example. opening a connection or starting a transaction) is done after 
the open() method has 
+  been called, which signifies that the task is ready to generate data.
+
+- The lifecycle of the methods are as follows:
+
+  - For each partition with partition_id:
 
-- The writer must be serializable, as it will be serialized and sent to the 
executors for execution.
+    - For each batch/epoch of streaming data with epoch_id:
 
-- All the three methods, `open`, `process` and `close` will be called on the 
executors.
+      - Method open(partitionId, epochId) is called.
 
-- The writer must do all the initialization (e.g. opening connections, 
starting a transaction, etc.) only when the `open` method is called. Be aware 
that, if there is any initialization in the class as soon as the object is 
created, then that initialization will happen in the driver (because that is 
where the instance is being created), which may not be what you intend.
+      - If open(...) returns true, for each row in the partition and 
batch/epoch, method process(row) is called.
 
-- `version` and `partition` are two parameters in `open` that uniquely 
represent a set of rows that needs to be pushed out. `version` is a 
monotonically increasing id that increases with every trigger. `partition` is 
an id that represents a partition of the output, since the output is 
distributed and will be processed on multiple executors.
+      - Method close(error) is called with error (if any) seen while 
processing rows.
 
-- `open` can use the `version` and `partition` to choose whether it needs to 
write the sequence of rows. Accordingly, it can return `true` (proceed with 
writing), or `false` (no need to write). If `false` is returned, then `process` 
will not be called on any row. For example, after a partial failure, some of 
the output partitions of the failed trigger may have already been committed to 
a database. Based on metadata stored in the database, the writer can identify 
partitions that have already been committed and accordingly return false to 
skip committing them again. 
+- The close() method (if it exists) is called if an open() method exists and 
returns successfully (irrespective of the return value), except if the JVM or 
Python process crashes in the middle.
 
-- Whenever `open` is called, `close` will also be called (unless the JVM exits 
due to some error). This is true even if `open` returns false. If there is any 
error in processing and writing the data, `close` will be called with the 
error. It is your responsibility to clean up state (e.g. connections, 
transactions, etc.) that have been created in `open` such that there are no 
resource leaks.
+- **Note:** The partitionId and epochId in the open() method can be used to 
deduplicate generated data 
+  when failures cause reprocessing of some input data. This depends on the 
execution mode of the query. 
+  If the streaming query is being executed in the micro-batch mode, then every 
partition represented 
+  by a unique tuple (partition_id, epoch_id) is guaranteed to have the same 
data. 
+  Hence, (partition_id, epoch_id) can be used to deduplicate and/or 
transactionally commit 
+  data and achieve exactly-once guarantees. However, if the streaming query is 
being executed 
+  in the continuous mode, then this guarantee does not hold and therefore 
should not be used for deduplication.
 
 #### Triggers
 The trigger settings of a streaming query defines the timing of streaming data 
processing, whether
@@ -2709,6 +2938,78 @@ write.stream(aggDF, "memory", outputMode = "complete", 
checkpointLocation = "pat
 </div>
 </div>
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed between 
restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either not 
allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as the 
restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input sources*: 
This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed and 
whether the semantics 
+  of the change are well-defined depends on the source and the query. Here are 
a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the results 
are unpredictable: `spark.readStream.format("kafka").option("subscribe", 
"topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are a 
few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are a 
few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("topic", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.selectExpr("anotherStringColumn AS json").writeStream`
+
+  - Changes in projections with different output schema are conditionally 
allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` 
is allowed only if the output sink allows the schema change from `"a"` to `"b"`.
+
+- *Changes in stateful operations*: Some operations in streaming queries need 
to maintain
+  state data in order to continuously update the result. Structured Streaming 
automatically checkpoints
+  the state data to fault-tolerant storage (for example, HDFS, AWS S3, Azure 
Blob storage) and restores it after restart.
+  However, this assumes that the schema of the state data remains same across 
restarts. This means that
+  *any changes (that is, additions, deletions, or schema modifications) to the 
stateful operations of a streaming query are not allowed between restarts*.
+  Here is the list of stateful operations whose schema should not be changed 
between restarts in order to ensure state recovery:
+
+  - *Streaming aggregation*: For example, `sdf.groupBy("a").agg(...)`. Any 
change in number or type of grouping keys or aggregates is not allowed.
+
+  - *Streaming deduplication*: For example, `sdf.dropDuplicates("a")`. Any 
change in number or type of grouping keys or aggregates is not allowed.
+
+  - *Stream-stream join*: For example, `sdf1.join(sdf2, ...)` (i.e. both 
inputs are generated with `sparkSession.readStream`). Changes
+    in the schema or equi-joining columns are not allowed. Changes in join 
type (outer or inner) not allowed. Other changes in the join condition are 
ill-defined.
+
+  - *Arbitrary stateful operation*: For example, 
`sdf.groupByKey(...).mapGroupsWithState(...)` or 
`sdf.groupByKey(...).flatMapGroupsWithState(...)`.
+    Any change to the schema of the user-defined state and the type of timeout 
is not allowed.
+    Any change within the user-defined state-mapping function are allowed, but 
the semantic effect of the change depends on the user-defined logic.
+    If you really want to support state schema changes, then you can 
explicitly encode/decode your complex state data
+    structures into bytes using an encoding/decoding scheme that supports 
schema migration. For example,
+    if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
+    restarts as the binary state will always be restored successfully.
+
 # Continuous Processing
 ## [Experimental]
 {:.no_toc}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to