Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r162261740
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join")  # 
right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is 
necessary.
    +However, a few types of stream-static outer join are not supported as the 
incomplete view of
    +all data in a stream makes it infeasible to calculate the results 
correctly.
    +These are discussed at the end of this section.
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you 
can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two 
data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides 
of the join making
    +it much harder to find matches between inputs. Any row received from one 
input stream can match
    +with any future, yet-to-be-received row from the other input stream. 
Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match 
every future input with
    +past input and accordingly generate joined results. Furthermore, similar 
to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state 
using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and 
how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions 
are supported.
    +However, as the stream runs, the size of streaming state will keep growing 
indefinitely as
    +*all* past input must be saved as the any new input can match with any 
input from the past.
    +To avoid unbounded state, you have to define additional join conditions 
such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared 
from the state.
    +In other words, you will have to do the following additional steps in the 
join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how 
delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the 
engine can figure out when
    +old rows of one input is not going to be required for matches with the 
other input. This constraint
    +can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN 
rightTime AND rightTime + INTERVAL 1 HOUR`),
    +or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = 
rightTimeWindow`).
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an 
ad was shown) with
    +another stream of user clicks on advertisements to correlate when 
impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, 
you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can 
be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range 
of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", 
"2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = 
impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 
hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "
    +  ));
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 
hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    --- End diff --
    
    sure!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to