Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/20255#discussion_r161366935 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat </div> </div> +Note that stream-static joins are not stateful, so no state management is necessary. +However, a few types of stream-static outer join are not supported as the incomplete view of +all data in a stream makes it infeasible to calculate the results correctly. +These are discussed at the end of this section. + +#### Stream-stream Joins +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming +Datasets/DataFrames. The challenge of generating join results between two data streams is that, +at any point of time, the view of the dataset is incomplete for both sides of the join making +it much harder to find matches between inputs. Any row received from one input stream can match +with any future, yet-to-be-received row from the other input stream. Hence, for both the input +streams, we buffer past input as streaming state, so that we can match every future input with +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations, +we automatically handle late, out-of-order data and can limit the state using watermarks. +Letâs discuss the different types of supported stream-stream joins and how to use them. + +##### Inner Joins with optional Watermarking +Inner joins on any kind of columns along with any kind of join conditions are supported. +However, as the stream runs, the size of streaming state will keep growing indefinitely as +*all* past input must be saved as the any new input can match with any input from the past. +To avoid unbounded state, you have to define additional join conditions such that indefinitely +old inputs cannot match with future inputs and therefore can be cleared from the state. +In other words, you will have to do the following additional steps in the join. + +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be +(similar to streaming aggregations) + +1. Define a constraint on event-time across the two inputs such that the engine can figure out when +old rows of one input is not going to be required for matches with the other input. This constraint +can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`), +or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`). +Letâs understand this with an example. + +Letâs say we want to join a stream of advertisement impressions (when an ad was shown) with +another stream of user clicks on advertisements to correlate when impressions led to +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to +specify the watermarking delays and the time constraints as follows. + +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order +in event-time by at most 2 and 3 hours, respectively. + +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour +after the corresponding impression. + +The code would look like this. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +import org.apache.spark.sql.functions.expr + +val impressions = spark.readStream. ... +val clicks = spark.readStream. ... + +// Apply watermarks on event-time columns +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours") +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours") + +// Join with event-time constraints +impressionsWithWatermark.join( + clicksWithWatermark, + expr(""" + clickAdId = impressionAdId AND + clickTime >= impressionTime AND + clickTime <= impressionTime + interval 1 hour + """ + )) + +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +import static org.apache.spark.sql.functions.expr + +Dataset<Row> impressions = spark.readStream(). ... +Dataset<Row> clicks = spark.readStream(). ... + +// Apply watermarks on event-time columns +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours"); +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours"); + +// Join with event-time constraints +impressionsWithWatermark.join( + clicksWithWatermark, + expr( + "clickAdId = impressionAdId AND " + + "clickTime >= impressionTime AND " + + "clickTime <= impressionTime + interval 1 hour " + )); + +{% endhighlight %} + + +</div> +<div data-lang="python" markdown="1"> + +{% highlight python %} +from pyspark.sql.functions import expr + +impressions = spark.readStream. ... +clicks = spark.readStream. ... + +# Apply watermarks on event-time columns +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours") +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours") + +# Join with event-time constraints +impressionsWithWatermark.join( + clicksWithWatermark, + expr(""" + clickAdId = impressionAdId AND + clickTime >= impressionTime AND + clickTime <= impressionTime + interval 1 hour + """ + )) --- End diff -- this *should* just work for R, like this: (I added withWatermark in 2.3) ``` impressions <- read.stream( ... clicks <- read.stream( ... # Apply watermarks on event-time columns impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours") clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours") # Join with event-time constraints impressionsWithWatermark.join( clicksWithWatermark, expr( "clickAdId = impressionAdId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 1 hour" )) ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org