Repository: spark Updated Branches: refs/heads/branch-2.3 9783aea2c -> 050c1e24e
[SPARK-23064][DOCS][SS] Added documentation for stream-stream joins ## What changes were proposed in this pull request? Added documentation for stream-stream joins ![image](https://user-images.githubusercontent.com/663212/35018744-e999895a-fad7-11e7-9d6a-8c7a73e6eb9c.png) ![image](https://user-images.githubusercontent.com/663212/35018775-157eb464-fad8-11e7-879e-47a2fcbd8690.png) ![image](https://user-images.githubusercontent.com/663212/35018784-27791a24-fad8-11e7-98f4-7ff246f62a74.png) ![image](https://user-images.githubusercontent.com/663212/35018791-36a80334-fad8-11e7-9791-f85efa7c6ba2.png) ## How was this patch tested? N/a Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #20255 from tdas/join-docs. (cherry picked from commit 1002bd6b23ff78a010ca259ea76988ef4c478c6e) Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/050c1e24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/050c1e24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/050c1e24 Branch: refs/heads/branch-2.3 Commit: 050c1e24e506ff224bcf4e3e458e57fbd216765c Parents: 9783aea Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Wed Jan 17 16:41:43 2018 -0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Wed Jan 17 16:41:49 2018 -0800 ---------------------------------------------------------------------- docs/structured-streaming-programming-guide.md | 338 +++++++++++++++++++- 1 file changed, 326 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/050c1e24/docs/structured-streaming-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index de13e28..1779a42 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1051,7 +1051,19 @@ output mode. ### Join Operations -Streaming DataFrames can be joined with static DataFrames to create new streaming DataFrames. Here are a few examples. +Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame +as well as another streaming Dataset/DataFrame. The result of the streaming join is generated +incrementally, similar to the results of streaming aggregations in the previous section. In this +section we will explore what type of joins (i.e. inner, outer, etc.) are supported in the above +cases. Note that in all the supported join types, the result of the join with a streaming +Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame +containing the same data in the stream. + + +#### Stream-static joins + +Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some +type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example. <div class="codetabs"> <div data-lang="scala" markdown="1"> @@ -1089,6 +1101,300 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat </div> </div> +Note that stream-static joins are not stateful, so no state management is necessary. +However, a few types of stream-static outer joins are not yet supported. +These are listed at the [end of this Join section](#support-matrix-for-joins-in-streaming-queries). + +#### Stream-stream Joins +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming +Datasets/DataFrames. The challenge of generating join results between two data streams is that, +at any point of time, the view of the dataset is incomplete for both sides of the join making +it much harder to find matches between inputs. Any row received from one input stream can match +with any future, yet-to-be-received row from the other input stream. Hence, for both the input +streams, we buffer past input as streaming state, so that we can match every future input with +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations, +we automatically handle late, out-of-order data and can limit the state using watermarks. +Letâs discuss the different types of supported stream-stream joins and how to use them. + +##### Inner Joins with optional Watermarking +Inner joins on any kind of columns along with any kind of join conditions are supported. +However, as the stream runs, the size of streaming state will keep growing indefinitely as +*all* past input must be saved as the any new input can match with any input from the past. +To avoid unbounded state, you have to define additional join conditions such that indefinitely +old inputs cannot match with future inputs and therefore can be cleared from the state. +In other words, you will have to do the following additional steps in the join. + +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be +(similar to streaming aggregations) + +1. Define a constraint on event-time across the two inputs such that the engine can figure out when +old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for +matches with the other input. This constraint can be defined in one of the two ways. + + 1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`), + + 1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`). + +Letâs understand this with an example. + +Letâs say we want to join a stream of advertisement impressions (when an ad was shown) with +another stream of user clicks on advertisements to correlate when impressions led to +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to +specify the watermarking delays and the time constraints as follows. + +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order +in event-time by at most 2 and 3 hours, respectively. + +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour +after the corresponding impression. + +The code would look like this. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +import org.apache.spark.sql.functions.expr + +val impressions = spark.readStream. ... +val clicks = spark.readStream. ... + +// Apply watermarks on event-time columns +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours") +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours") + +// Join with event-time constraints +impressionsWithWatermark.join( + clicksWithWatermark, + expr(""" + clickAdId = impressionAdId AND + clickTime >= impressionTime AND + clickTime <= impressionTime + interval 1 hour + """) +) + +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +import static org.apache.spark.sql.functions.expr + +Dataset<Row> impressions = spark.readStream(). ... +Dataset<Row> clicks = spark.readStream(). ... + +// Apply watermarks on event-time columns +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours"); +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours"); + +// Join with event-time constraints +impressionsWithWatermark.join( + clicksWithWatermark, + expr( + "clickAdId = impressionAdId AND " + + "clickTime >= impressionTime AND " + + "clickTime <= impressionTime + interval 1 hour ") +); + +{% endhighlight %} + + +</div> +<div data-lang="python" markdown="1"> + +{% highlight python %} +from pyspark.sql.functions import expr + +impressions = spark.readStream. ... +clicks = spark.readStream. ... + +# Apply watermarks on event-time columns +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours") +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours") + +# Join with event-time constraints +impressionsWithWatermark.join( + clicksWithWatermark, + expr(""" + clickAdId = impressionAdId AND + clickTime >= impressionTime AND + clickTime <= impressionTime + interval 1 hour + """) +) + +{% endhighlight %} + +</div> +</div> + +##### Outer Joins with Watermarking +While the watermark + event-time constraints is optional for inner joins, for left and right outer +joins they must be specified. This is because for generating the NULL results in outer join, the +engine must know when an input row is not going to match with anything in future. Hence, the +watermark + event-time constraints must be specified for generating correct results. Therefore, +a query with outer-join will look quite like the ad-monetization example earlier, except that +there will be an additional parameter specifying it to be an outer-join. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} + +impressionsWithWatermark.join( + clicksWithWatermark, + expr(""" + clickAdId = impressionAdId AND + clickTime >= impressionTime AND + clickTime <= impressionTime + interval 1 hour + """), + joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter" + ) + +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +impressionsWithWatermark.join( + clicksWithWatermark, + expr( + "clickAdId = impressionAdId AND " + + "clickTime >= impressionTime AND " + + "clickTime <= impressionTime + interval 1 hour "), + "leftOuter" // can be "inner", "leftOuter", "rightOuter" +); + +{% endhighlight %} + + +</div> +<div data-lang="python" markdown="1"> + +{% highlight python %} +impressionsWithWatermark.join( + clicksWithWatermark, + expr(""" + clickAdId = impressionAdId AND + clickTime >= impressionTime AND + clickTime <= impressionTime + interval 1 hour + """), + "leftOuter" # can be "inner", "leftOuter", "rightOuter" +) + +{% endhighlight %} + +</div> +</div> + +However, note that the outer NULL results will be generated with a delay (depends on the specified +watermark delay and the time range condition) because the engine has to wait for that long to ensure +there were no matches and there will be no more matches in future. + +##### Support matrix for joins in streaming queries + +<table class ="table"> + <tr> + <th>Left Input</th> + <th>Right Input</th> + <th>Join Type</th> + <th></th> + </tr> + <tr> + <td style="vertical-align: middle;">Static</td> + <td style="vertical-align: middle;">Static</td> + <td style="vertical-align: middle;">All types</td> + <td style="vertical-align: middle;"> + Supported, since its not on streaming data even though it + can be present in a streaming query + </td> + </tr> + <tr> + <td rowspan="4" style="vertical-align: middle;">Stream</td> + <td rowspan="4" style="vertical-align: middle;">Static</td> + <td style="vertical-align: middle;">Inner</td> + <td style="vertical-align: middle;">Supported, not stateful</td> + </tr> + <tr> + <td style="vertical-align: middle;">Left Outer</td> + <td style="vertical-align: middle;">Supported, not stateful</td> + </tr> + <tr> + <td style="vertical-align: middle;">Right Outer</td> + <td style="vertical-align: middle;">Not supported</td> + </tr> + <tr> + <td style="vertical-align: middle;">Full Outer</td> + <td style="vertical-align: middle;">Not supported</td> + </tr> + <tr> + <td rowspan="4" style="vertical-align: middle;">Static</td> + <td rowspan="4" style="vertical-align: middle;">Stream</td> + <td style="vertical-align: middle;">Inner</td> + <td style="vertical-align: middle;">Supported, not stateful</td> + </tr> + <tr> + <td style="vertical-align: middle;">Left Outer</td> + <td style="vertical-align: middle;">Not supported</td> + </tr> + <tr> + <td style="vertical-align: middle;">Right Outer</td> + <td style="vertical-align: middle;">Supported, not stateful</td> + </tr> + <tr> + <td style="vertical-align: middle;">Full Outer</td> + <td style="vertical-align: middle;">Not supported</td> + </tr> + <tr> + <td rowspan="4" style="vertical-align: middle;">Stream</td> + <td rowspan="4" style="vertical-align: middle;">Stream</td> + <td style="vertical-align: middle;">Inner</td> + <td style="vertical-align: middle;"> + Supported, optionally specify watermark on both sides + + time constraints for state cleanup + </td> + </tr> + <tr> + <td style="vertical-align: middle;">Left Outer</td> + <td style="vertical-align: middle;"> + Conditionally supported, must specify watermark on right + time constraints for correct + results, optionally specify watermark on left for all state cleanup + </td> + </tr> + <tr> + <td style="vertical-align: middle;">Right Outer</td> + <td style="vertical-align: middle;"> + Conditionally supported, must specify watermark on left + time constraints for correct + results, optionally specify watermark on right for all state cleanup + </td> + </tr> + <tr> + <td style="vertical-align: middle;">Full Outer</td> + <td style="vertical-align: middle;">Not supported</td> + </tr> + <tr> + <td></td> + <td></td> + <td></td> + <td></td> + </tr> +</table> + +Additional details on supported joins: + +- Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3, ...).join(df4, ....)`. + +- As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported. + +- As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of + what cannot be used. + + - Cannot use streaming aggregations before joins. + + - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins. + + ### Streaming Deduplication You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking. @@ -1160,15 +1466,9 @@ Some of them are as follows. - Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode. -- Outer joins between a streaming and a static Datasets are conditionally supported. - - + Full outer join with a streaming Dataset is not supported - - + Left outer join with a streaming Dataset on the right is not supported - - + Right outer join with a streaming Dataset on the left is not supported - -- Any kind of joins between two streaming Datasets is not yet supported. +- Few types of outer joins on streaming Datasets are not supported. See the + <a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a> + for more details. In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that). @@ -1277,6 +1577,15 @@ Here is the compatibility matrix. </td> </tr> <tr> + <td colspan="2" style="vertical-align: middle;">Queries with <code>joins</code></td> + <td style="vertical-align: middle;">Append</td> + <td style="vertical-align: middle;"> + Update and Complete mode not supported yet. See the + <a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a> + for more details on what types of joins are supported. + </td> + </tr> + <tr> <td colspan="2" style="vertical-align: middle;">Other queries</td> <td style="vertical-align: middle;">Append, Update</td> <td style="vertical-align: middle;"> @@ -2142,6 +2451,11 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat **Talks** -- Spark Summit 2017 Talk - [Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark](https://spark-summit.org/2017/events/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark/) -- Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/) +- Spark Summit Europe 2017 + - Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark - + [Part 1 slides/video](https://databricks.com/session/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark), [Part 2 slides/video](https://databricks.com/session/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark-continues) + - Deep Dive into Stateful Stream Processing in Structured Streaming - [slides/video](https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming) +- Spark Summit 2016 + - A Deep Dive into Structured Streaming - [slides/video](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/) + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org