This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new e264c21 [SPARK-36172][SS] Document session window into Structured Streaming guide doc e264c21 is described below commit e264c21707a4c9f2df9f81a202b0568b16afa3f6 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Wed Jul 21 10:45:31 2021 +0900 [SPARK-36172][SS] Document session window into Structured Streaming guide doc ### What changes were proposed in this pull request? This PR documents a new feature "native support of session window" into Structured Streaming guide doc. Screenshots are following: ![스크린샷 2021-07-20 오후 5 04 20](https://user-images.githubusercontent.com/1317309/126284848-526ec056-1028-4a70-a1f4-ae275d4b5437.png) ![스크린샷 2021-07-20 오후 3 34 38](https://user-images.githubusercontent.com/1317309/126276763-763cf841-aef7-412a-aa03-d93273f0c850.png) ### Why are the changes needed? This change is needed to explain a new feature to the end users. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Documentation changes. Closes #33433 from HeartSaVioR/SPARK-36172. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 0eb31a06d6f2b7583b6a9c646baeff58094f8d6c) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../img/structured-streaming-time-window-types.jpg | Bin 0 -> 56637 bytes docs/img/structured-streaming.pptx | Bin 1126657 -> 1130887 bytes docs/structured-streaming-programming-guide.md | 71 +++++++++++++++++++++ 3 files changed, 71 insertions(+) diff --git a/docs/img/structured-streaming-time-window-types.jpg b/docs/img/structured-streaming-time-window-types.jpg new file mode 100644 index 0000000..7e0ad1b Binary files /dev/null and b/docs/img/structured-streaming-time-window-types.jpg differ diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx index 2ffd9f2..b35bf75 100644 Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 1eabcdd..3d02eb7 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1063,6 +1063,77 @@ then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is appended to the Result Table only after the watermark is updated to `12:11`. +#### Types of time windows + +Spark supports three types of time windows: tumbling (fixed), sliding and session. + +![The types of time windows](img/structured-streaming-time-window-types.jpg) + +Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals. An input +can only be bound to a single window. + +Sliding windows are similar to the tumbling windows from the point of being "fixed-sized", but windows +can overlap if the duration of slide is smaller than the duration of window, and in this case an input +can be bound to the multiple windows. + +Tumbling and sliding window use `window` function, which has been described on above examples. + +Session windows have different characteristic compared to the previous two types. Session window has a dynamic size +of the window length, depending on the inputs. A session window starts with an input, and expands itself +if following input has been received within gap duration. A session window closes when there's no input +received within gap duration after receiving the latest input. + +Session window uses `session_window` function. The usage of the function is similar to the `window` function. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +import spark.implicits._ + +val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String } + +// Group the data by session window and userId, and compute the count of each group +val sessionizedCounts = events + .withWatermark("timestamp", "10 minutes") + .groupBy( + session_window($"timestamp", "5 minutes"), + $"userId") + .count() +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String } + +// Group the data by session window and userId, and compute the count of each group +Dataset<Row> sessionizedCounts = events + .withWatermark("timestamp", "10 minutes") + .groupBy( + session_window(col("timestamp"), "5 minutes"), + col("userId")) + .count(); +{% endhighlight %} + +</div> +<div data-lang="python" markdown="1"> +{% highlight python %} +events = ... # streaming DataFrame of schema { timestamp: Timestamp, userId: String } + +# Group the data by session window and userId, and compute the count of each group +sessionizedCounts = events \ + .withWatermark("timestamp", "10 minutes") \ + .groupBy( + session_window(events.timestamp, "5 minutes"), + events.userId) \ + .count() +{% endhighlight %} + +</div> +</div> + ##### Conditions for watermarking to clean aggregation state {:.no_toc} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org