Repository: spark
Updated Branches:
  refs/heads/master bac0d661a -> 1002bd6b2


[SPARK-23064][DOCS][SS] Added documentation for stream-stream joins

## What changes were proposed in this pull request?
Added documentation for stream-stream joins

![image](https://user-images.githubusercontent.com/663212/35018744-e999895a-fad7-11e7-9d6a-8c7a73e6eb9c.png)

![image](https://user-images.githubusercontent.com/663212/35018775-157eb464-fad8-11e7-879e-47a2fcbd8690.png)

![image](https://user-images.githubusercontent.com/663212/35018784-27791a24-fad8-11e7-98f4-7ff246f62a74.png)

![image](https://user-images.githubusercontent.com/663212/35018791-36a80334-fad8-11e7-9791-f85efa7c6ba2.png)

## How was this patch tested?

N/a

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #20255 from tdas/join-docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1002bd6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1002bd6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1002bd6b

Branch: refs/heads/master
Commit: 1002bd6b23ff78a010ca259ea76988ef4c478c6e
Parents: bac0d66
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Wed Jan 17 16:41:43 2018 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Wed Jan 17 16:41:43 2018 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md | 338 +++++++++++++++++++-
 1 file changed, 326 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1002bd6b/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index de13e28..1779a42 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1051,7 +1051,19 @@ output mode.
 
 
 ### Join Operations
-Streaming DataFrames can be joined with static DataFrames to create new 
streaming DataFrames. Here are a few examples.
+Structured Streaming supports joining a streaming Dataset/DataFrame with a 
static Dataset/DataFrame
+as well as another streaming Dataset/DataFrame. The result of the streaming 
join is generated
+incrementally, similar to the results of streaming aggregations in the 
previous section. In this
+section we will explore what type of joins (i.e. inner, outer, etc.) are 
supported in the above
+cases. Note that in all the supported join types, the result of the join with 
a streaming
+Dataset/DataFrame will be the exactly the same as if it was with a static 
Dataset/DataFrame
+containing the same data in the stream.
+
+
+#### Stream-static joins
+
+Since the introduction in Spark 2.0, Structured Streaming has supported joins 
(inner join and some
+type of outer joins) between a streaming and a static DataFrame/Dataset. Here 
is a simple example.
 
 <div class="codetabs">
 <div data-lang="scala"  markdown="1">
@@ -1089,6 +1101,300 @@ streamingDf.join(staticDf, "type", "right_join")  # 
right outer join with a stat
 </div>
 </div>
 
+Note that stream-static joins are not stateful, so no state management is 
necessary.
+However, a few types of stream-static outer joins are not yet supported.
+These are listed at the [end of this Join 
section](#support-matrix-for-joins-in-streaming-queries).
+
+#### Stream-stream Joins
+In Spark 2.3, we have added support for stream-stream joins, that is, you can 
join two streaming
+Datasets/DataFrames. The challenge of generating join results between two data 
streams is that,
+at any point of time, the view of the dataset is incomplete for both sides of 
the join making
+it much harder to find matches between inputs. Any row received from one input 
stream can match
+with any future, yet-to-be-received row from the other input stream. Hence, 
for both the input
+streams, we buffer past input as streaming state, so that we can match every 
future input with
+past input and accordingly generate joined results. Furthermore, similar to 
streaming aggregations,
+we automatically handle late, out-of-order data and can limit the state using 
watermarks.
+Let’s discuss the different types of supported stream-stream joins and how 
to use them.
+
+##### Inner Joins with optional Watermarking
+Inner joins on any kind of columns along with any kind of join conditions are 
supported.
+However, as the stream runs, the size of streaming state will keep growing 
indefinitely as
+*all* past input must be saved as the any new input can match with any input 
from the past.
+To avoid unbounded state, you have to define additional join conditions such 
that indefinitely
+old inputs cannot match with future inputs and therefore can be cleared from 
the state.
+In other words, you will have to do the following additional steps in the join.
+
+1. Define watermark delays on both inputs such that the engine knows how 
delayed the input can be
+(similar to streaming aggregations)
+
+1. Define a constraint on event-time across the two inputs such that the 
engine can figure out when
+old rows of one input is not going to be required (i.e. will not satisfy the 
time constraint) for
+matches with the other input. This constraint can be defined in one of the two 
ways.
+
+    1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEN rightTime 
AND rightTime + INTERVAL 1 HOUR`),
+
+    1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow = 
rightTimeWindow`).
+
+Let’s understand this with an example.
+
+Let’s say we want to join a stream of advertisement impressions (when an ad 
was shown) with
+another stream of user clicks on advertisements to correlate when impressions 
led to
+monetizable clicks. To allow the state cleanup in this stream-stream join, you 
will have to
+specify the watermarking delays and the time constraints as follows.
+
+1. Watermark delays: Say, the impressions and the corresponding clicks can be 
late/out-of-order
+in event-time by at most 2 and 3 hours, respectively.
+
+1. Event-time range condition: Say, a click can occur within a time range of 0 
seconds to 1 hour
+after the corresponding impression.
+
+The code would look like this.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+import org.apache.spark.sql.functions.expr
+
+val impressions = spark.readStream. ...
+val clicks = spark.readStream. ...
+
+// Apply watermarks on event-time columns
+val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 
hours")
+val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
+
+// Join with event-time constraints
+impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr("""
+    clickAdId = impressionAdId AND
+    clickTime >= impressionTime AND
+    clickTime <= impressionTime + interval 1 hour
+    """)
+)
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+import static org.apache.spark.sql.functions.expr
+
+Dataset<Row> impressions = spark.readStream(). ...
+Dataset<Row> clicks = spark.readStream(). ...
+
+// Apply watermarks on event-time columns
+Dataset<Row> impressionsWithWatermark = 
impressions.withWatermark("impressionTime", "2 hours");
+Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 
hours");
+
+// Join with event-time constraints
+impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr(
+    "clickAdId = impressionAdId AND " +
+    "clickTime >= impressionTime AND " +
+    "clickTime <= impressionTime + interval 1 hour ")
+);
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+from pyspark.sql.functions import expr
+
+impressions = spark.readStream. ...
+clicks = spark.readStream. ...
+
+# Apply watermarks on event-time columns
+impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 
hours")
+clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
+
+# Join with event-time constraints
+impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr("""
+    clickAdId = impressionAdId AND
+    clickTime >= impressionTime AND
+    clickTime <= impressionTime + interval 1 hour
+    """)
+)
+
+{% endhighlight %}
+
+</div>
+</div>
+
+##### Outer Joins with Watermarking
+While the watermark + event-time constraints is optional for inner joins, for 
left and right outer
+joins they must be specified. This is because for generating the NULL results 
in outer join, the
+engine must know when an input row is not going to match with anything in 
future. Hence, the
+watermark + event-time constraints must be specified for generating correct 
results. Therefore,
+a query with outer-join will look quite like the ad-monetization example 
earlier, except that
+there will be an additional parameter specifying it to be an outer-join.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+
+impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr("""
+    clickAdId = impressionAdId AND
+    clickTime >= impressionTime AND
+    clickTime <= impressionTime + interval 1 hour
+    """),
+  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
+ )
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr(
+    "clickAdId = impressionAdId AND " +
+    "clickTime >= impressionTime AND " +
+    "clickTime <= impressionTime + interval 1 hour "),
+  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter"
+);
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr("""
+    clickAdId = impressionAdId AND
+    clickTime >= impressionTime AND
+    clickTime <= impressionTime + interval 1 hour
+    """),
+  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter"
+)
+
+{% endhighlight %}
+
+</div>
+</div>
+
+However, note that the outer NULL results will be generated with a delay 
(depends on the specified
+watermark delay and the time range condition) because the engine has to wait 
for that long to ensure
+there were no matches and there will be no more matches in future.
+
+##### Support matrix for joins in streaming queries
+
+<table class ="table">
+  <tr>
+    <th>Left Input</th>
+    <th>Right Input</th>
+    <th>Join Type</th>
+    <th></th>
+  </tr>
+  <tr>
+      <td style="vertical-align: middle;">Static</td>
+      <td style="vertical-align: middle;">Static</td>
+      <td style="vertical-align: middle;">All types</td>
+      <td style="vertical-align: middle;">
+        Supported, since its not on streaming data even though it
+        can be present in a streaming query
+      </td>
+  </tr>
+  <tr>
+    <td rowspan="4" style="vertical-align: middle;">Stream</td>
+    <td rowspan="4" style="vertical-align: middle;">Static</td>
+    <td style="vertical-align: middle;">Inner</td>
+    <td style="vertical-align: middle;">Supported, not stateful</td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Left Outer</td>
+    <td style="vertical-align: middle;">Supported, not stateful</td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Right Outer</td>
+    <td style="vertical-align: middle;">Not supported</td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Full Outer</td>
+    <td style="vertical-align: middle;">Not supported</td>
+  </tr>
+  <tr>
+    <td rowspan="4" style="vertical-align: middle;">Static</td>
+    <td rowspan="4" style="vertical-align: middle;">Stream</td>
+    <td style="vertical-align: middle;">Inner</td>
+    <td style="vertical-align: middle;">Supported, not stateful</td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Left Outer</td>
+    <td style="vertical-align: middle;">Not supported</td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Right Outer</td>
+    <td style="vertical-align: middle;">Supported, not stateful</td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Full Outer</td>
+    <td style="vertical-align: middle;">Not supported</td>
+  </tr>
+  <tr>
+    <td rowspan="4" style="vertical-align: middle;">Stream</td>
+    <td rowspan="4" style="vertical-align: middle;">Stream</td>
+    <td style="vertical-align: middle;">Inner</td>
+    <td style="vertical-align: middle;">
+      Supported, optionally specify watermark on both sides +
+      time constraints for state cleanup
+    </td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Left Outer</td>
+    <td style="vertical-align: middle;">
+      Conditionally supported, must specify watermark on right + time 
constraints for correct
+      results, optionally specify watermark on left for all state cleanup
+    </td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Right Outer</td>
+    <td style="vertical-align: middle;">
+      Conditionally supported, must specify watermark on left + time 
constraints for correct
+      results, optionally specify watermark on right for all state cleanup
+    </td>
+  </tr>
+  <tr>
+    <td style="vertical-align: middle;">Full Outer</td>
+    <td style="vertical-align: middle;">Not supported</td>
+  </tr>
+ <tr>
+    <td></td>
+    <td></td>
+    <td></td>
+    <td></td>
+  </tr>
+</table>
+
+Additional details on supported joins:
+
+- Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3, 
...).join(df4, ....)`.
+
+- As of Spark 2.3, you can use joins only when the query is in Append output 
mode. Other output modes are not yet supported.
+
+- As of Spark 2.3, you cannot use other non-map-like operations before joins. 
Here are a few examples of
+  what cannot be used.
+
+  - Cannot use streaming aggregations before joins.
+
+  - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode 
before joins.
+
+
 ### Streaming Deduplication
 You can deduplicate records in data streams using a unique identifier in the 
events. This is exactly same as deduplication on static using a unique 
identifier column. The query will store the necessary amount of data from 
previous records such that it can filter duplicate records. Similar to 
aggregations, you can use deduplication with or without watermarking.
 
@@ -1160,15 +1466,9 @@ Some of them are as follows.
 
 - Sorting operations are supported on streaming Datasets only after an 
aggregation and in Complete Output Mode.
 
-- Outer joins between a streaming and a static Datasets are conditionally 
supported.
-
-    + Full outer join with a streaming Dataset is not supported
-
-    + Left outer join with a streaming Dataset on the right is not supported
-
-    + Right outer join with a streaming Dataset on the left is not supported
-
-- Any kind of joins between two streaming Datasets is not yet supported.
+- Few types of outer joins on streaming Datasets are not supported. See the
+  <a href="#support-matrix-for-joins-in-streaming-queries">support matrix in 
the Join Operations section</a>
+  for more details.
 
 In addition, there are some Dataset methods that will not work on streaming 
Datasets. They are actions that will immediately run queries and return 
results, which does not make sense on a streaming Dataset. Rather, those 
functionalities can be done by explicitly starting a streaming query (see the 
next section regarding that).
 
@@ -1277,6 +1577,15 @@ Here is the compatibility matrix.
     </td>
   </tr>
   <tr>
+      <td colspan="2" style="vertical-align: middle;">Queries with 
<code>joins</code></td>
+      <td style="vertical-align: middle;">Append</td>
+      <td style="vertical-align: middle;">
+        Update and Complete mode not supported yet. See the
+        <a href="#support-matrix-for-joins-in-streaming-queries">support 
matrix in the Join Operations section</a>
+         for more details on what types of joins are supported.
+      </td>
+    </tr>
+  <tr>
     <td colspan="2" style="vertical-align: middle;">Other queries</td>
     <td style="vertical-align: middle;">Append, Update</td>
     <td style="vertical-align: middle;">
@@ -2142,6 +2451,11 @@ write.stream(aggDF, "memory", outputMode = "complete", 
checkpointLocation = "pat
 
 **Talks**
 
-- Spark Summit 2017 Talk - [Easy, Scalable, Fault-tolerant Stream Processing 
with Structured Streaming in Apache 
Spark](https://spark-summit.org/2017/events/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark/)
-- Spark Summit 2016 Talk - [A Deep Dive into Structured 
Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
+- Spark Summit Europe 2017
+  - Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming 
in Apache Spark -
+    [Part 1 
slides/video](https://databricks.com/session/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark),
 [Part 2 
slides/video](https://databricks.com/session/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark-continues)
+  - Deep Dive into Stateful Stream Processing in Structured Streaming - 
[slides/video](https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
+- Spark Summit 2016
+  - A Deep Dive into Structured Streaming - 
[slides/video](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
+
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to