This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 648750f3843af5ff12d696a3caedf31c719511bf Author: David Anderson <da...@alpinegizmo.com> AuthorDate: Tue Apr 21 13:48:42 2020 +0200 [FLINK-17239][docs] Streaming Analytics tutorial This closes #11843. --- docs/fig/window-assigners.svg | 328 +++++++++++++++++++++ docs/tutorials/streaming_analytics.md | 475 +++++++++++++++++++++++++++++++ docs/tutorials/streaming_analytics.zh.md | 475 +++++++++++++++++++++++++++++++ 3 files changed, 1278 insertions(+) diff --git a/docs/fig/window-assigners.svg b/docs/fig/window-assigners.svg new file mode 100644 index 0000000..a36ff89 --- /dev/null +++ b/docs/fig/window-assigners.svg @@ -0,0 +1,328 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> +<svg xmlns:xl="http://www.w3.org/1999/xlink" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:dc="http://purl.org/dc/elements/1.1/" viewBox="-43.33231 2 758.537 367.49606" width="758.537" height="367.49606"> + <defs> + <font-face font-family="Helvetica" font-size="16" units-per-em="1000" underline-position="-75.68359" underline-thickness="49.316406" slope="0" x-height="532.22656" cap-height="719.7266" ascent="770.0195" descent="-229.98047" font-weight="700"> + <font-face-src> + <font-face-name name="Helvetica-Bold"/> + </font-face-src> + </font-face> + </defs> + <metadata> Produced by OmniGraffle 7.15 + <dc:date>2018-12-06 15:14:03 +0000</dc:date> + </metadata> + <g id="Canvas_1" fill-opacity="1" stroke-dasharray="none" stroke="none" fill="none" stroke-opacity="1"> + <title>Canvas 1</title> + <g id="Canvas_1: events"> + <title>events</title> + <g id="Graphic_15"> + <path d="M 374.9764 219.49606 C 374.9764 214.25152 379.2279 210 384.47244 210 C 386.99097 210 389.4063 211.00047 391.18716 212.78132 C 392.968 214.5622 393.9685 216.97755 393.9685 219.49606 C 393.9685 224.7406 389.717 228.99212 384.47244 228.99212 C 379.2279 228.99212 374.9764 224.7406 374.9764 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_14"> + <path d="M 401.47244 219.49606 C 401.47244 214.25152 405.72397 210 410.9685 210 C 413.48703 210 415.90237 211.00047 417.6832 212.78132 C 419.4641 214.5622 420.46457 216.97755 420.46457 219.49606 C 420.46457 224.7406 416.21304 228.99212 410.9685 228.99212 C 405.72397 228.99212 401.47244 224.7406 401.47244 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_13"> + <path d="M 209 219.49606 C 209 214.25152 213.25153 210 218.49606 210 C 221.01457 210 223.42993 211.00047 225.2108 212.78132 C 226.99165 214.5622 227.99213 216.97755 227.99213 219.49606 C 227.99213 224.7406 223.7406 228.99212 218.49606 228.99212 C 213.25153 228.99212 209 224.7406 209 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_12"> + <path d="M 242 219.49606 C 242 214.25152 246.25153 210 251.49606 210 C 254.01457 210 256.42993 211.00047 258.2108 212.78132 C 259.99164 214.5622 260.99213 216.97755 260.99213 219.49606 C 260.99213 224.7406 256.7406 228.99212 251.49606 228.99212 C 246.25153 228.99212 242 224.7406 242 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_11"> + <path d="M 267 219.49606 C 267 214.25152 271.25153 210 276.49606 210 C 279.0146 210 281.42993 211.00047 283.2108 212.78132 C 284.99164 214.5622 285.99213 216.97755 285.99213 219.49606 C 285.99213 224.7406 281.7406 228.99212 276.49606 228.99212 C 271.25153 228.99212 267 224.7406 267 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_10"> + <path d="M 292 219.49606 C 292 214.25152 296.25153 210 301.49606 210 C 304.0146 210 306.42993 211.00047 308.2108 212.78132 C 309.99164 214.5622 310.99213 216.97755 310.99213 219.49606 C 310.99213 224.7406 306.7406 228.99212 301.49606 228.99212 C 296.25153 228.99212 292 224.7406 292 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_9"> + <path d="M 568.4803 219.49606 C 568.4803 214.25152 572.7319 210 577.9764 210 C 580.4949 210 582.9103 211.00047 584.6911 212.78132 C 586.472 214.5622 587.4724 216.97755 587.4724 219.49606 C 587.4724 224.7406 583.2209 228.99212 577.9764 228.99212 C 572.7319 228.99212 568.4803 224.7406 568.4803 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_8"> + <path d="M 594.9764 219.49606 C 594.9764 214.25152 599.2279 210 604.4724 210 C 606.991 210 609.4063 211.00047 611.1872 212.78132 C 612.96804 214.5622 613.9685 216.97755 613.9685 219.49606 C 613.9685 224.7406 609.717 228.99212 604.4724 228.99212 C 599.2279 228.99212 594.9764 224.7406 594.9764 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_7"> + <path d="M 618.4803 219.49606 C 618.4803 214.25152 622.7319 210 627.9764 210 C 630.4949 210 632.9103 211.00047 634.6911 212.78132 C 636.472 214.5622 637.4724 216.97755 637.4724 219.49606 C 637.4724 224.7406 633.2209 228.99212 627.9764 228.99212 C 622.7319 228.99212 618.4803 224.7406 618.4803 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_6"> + <path d="M 477 219.49606 C 477 214.25152 481.2515 210 486.49606 210 C 489.0146 210 491.42993 211.00047 493.2108 212.78132 C 494.99164 214.5622 495.9921 216.97755 495.9921 219.49606 C 495.9921 224.7406 491.7406 228.99212 486.49606 228.99212 C 481.2515 228.99212 477 224.7406 477 219.49606 Z" fill="#90f"/> + </g> + <g id="Graphic_25"> + <path d="M 374.9764 288.99606 C 374.9764 283.75153 379.2279 279.5 384.47244 279.5 C 386.99097 279.5 389.4063 280.50047 391.18716 282.28133 C 392.968 284.0622 393.9685 286.47755 393.9685 288.99606 C 393.9685 294.2406 389.717 298.49213 384.47244 298.49213 C 379.2279 298.49213 374.9764 294.2406 374.9764 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_24"> + <path d="M 401.47244 288.99606 C 401.47244 283.75153 405.72397 279.5 410.9685 279.5 C 413.48703 279.5 415.90237 280.50047 417.6832 282.28133 C 419.4641 284.0622 420.46457 286.47755 420.46457 288.99606 C 420.46457 294.2406 416.21304 298.49213 410.9685 298.49213 C 405.72397 298.49213 401.47244 294.2406 401.47244 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_23"> + <path d="M 209 288.99606 C 209 283.75153 213.25153 279.5 218.49606 279.5 C 221.01457 279.5 223.42993 280.50047 225.2108 282.28133 C 226.99165 284.0622 227.99213 286.47755 227.99213 288.99606 C 227.99213 294.2406 223.7406 298.49213 218.49606 298.49213 C 213.25153 298.49213 209 294.2406 209 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_22"> + <path d="M 242 288.99606 C 242 283.75153 246.25153 279.5 251.49606 279.5 C 254.01457 279.5 256.42993 280.50047 258.2108 282.28133 C 259.99164 284.0622 260.99213 286.47755 260.99213 288.99606 C 260.99213 294.2406 256.7406 298.49213 251.49606 298.49213 C 246.25153 298.49213 242 294.2406 242 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_21"> + <path d="M 267 288.99606 C 267 283.75153 271.25153 279.5 276.49606 279.5 C 279.0146 279.5 281.42993 280.50047 283.2108 282.28133 C 284.99164 284.0622 285.99213 286.47755 285.99213 288.99606 C 285.99213 294.2406 281.7406 298.49213 276.49606 298.49213 C 271.25153 298.49213 267 294.2406 267 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_20"> + <path d="M 292 288.99606 C 292 283.75153 296.25153 279.5 301.49606 279.5 C 304.0146 279.5 306.42993 280.50047 308.2108 282.28133 C 309.99164 284.0622 310.99213 286.47755 310.99213 288.99606 C 310.99213 294.2406 306.7406 298.49213 301.49606 298.49213 C 296.25153 298.49213 292 294.2406 292 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_19"> + <path d="M 568.4803 288.99606 C 568.4803 283.75153 572.7319 279.5 577.9764 279.5 C 580.4949 279.5 582.9103 280.50047 584.6911 282.28133 C 586.472 284.0622 587.4724 286.47755 587.4724 288.99606 C 587.4724 294.2406 583.2209 298.49213 577.9764 298.49213 C 572.7319 298.49213 568.4803 294.2406 568.4803 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_18"> + <path d="M 594.9764 288.99606 C 594.9764 283.75153 599.2279 279.5 604.4724 279.5 C 606.991 279.5 609.4063 280.50047 611.1872 282.28133 C 612.96804 284.0622 613.9685 286.47755 613.9685 288.99606 C 613.9685 294.2406 609.717 298.49213 604.4724 298.49213 C 599.2279 298.49213 594.9764 294.2406 594.9764 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_17"> + <path d="M 618.4803 288.99606 C 618.4803 283.75153 622.7319 279.5 627.9764 279.5 C 630.4949 279.5 632.9103 280.50047 634.6911 282.28133 C 636.472 284.0622 637.4724 286.47755 637.4724 288.99606 C 637.4724 294.2406 633.2209 298.49213 627.9764 298.49213 C 622.7319 298.49213 618.4803 294.2406 618.4803 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_16"> + <path d="M 477 288.99606 C 477 283.75153 481.2515 279.5 486.49606 279.5 C 489.0146 279.5 491.42993 280.50047 493.2108 282.28133 C 494.99164 284.0622 495.9921 286.47755 495.9921 288.99606 C 495.9921 294.2406 491.7406 298.49213 486.49606 298.49213 C 481.2515 298.49213 477 294.2406 477 288.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_35"> + <path d="M 374.9764 153.5 C 374.9764 148.25546 379.2279 144.00394 384.47244 144.00394 C 386.99097 144.00394 389.4063 145.00441 391.18716 146.78526 C 392.968 148.56613 393.9685 150.9815 393.9685 153.5 C 393.9685 158.74454 389.717 162.99606 384.47244 162.99606 C 379.2279 162.99606 374.9764 158.74454 374.9764 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_34"> + <path d="M 401.47244 153.5 C 401.47244 148.25546 405.72397 144.00394 410.9685 144.00394 C 413.48703 144.00394 415.90237 145.00441 417.6832 146.78526 C 419.4641 148.56613 420.46457 150.9815 420.46457 153.5 C 420.46457 158.74454 416.21304 162.99606 410.9685 162.99606 C 405.72397 162.99606 401.47244 158.74454 401.47244 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_33"> + <path d="M 209 153.5 C 209 148.25546 213.25153 144.00394 218.49606 144.00394 C 221.01457 144.00394 223.42993 145.00441 225.2108 146.78526 C 226.99165 148.56613 227.99213 150.9815 227.99213 153.5 C 227.99213 158.74454 223.7406 162.99606 218.49606 162.99606 C 213.25153 162.99606 209 158.74454 209 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_32"> + <path d="M 242 153.5 C 242 148.25546 246.25153 144.00394 251.49606 144.00394 C 254.01457 144.00394 256.42993 145.00441 258.2108 146.78526 C 259.99164 148.56613 260.99213 150.9815 260.99213 153.5 C 260.99213 158.74454 256.7406 162.99606 251.49606 162.99606 C 246.25153 162.99606 242 158.74454 242 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_31"> + <path d="M 267 153.5 C 267 148.25546 271.25153 144.00394 276.49606 144.00394 C 279.0146 144.00394 281.42993 145.00441 283.2108 146.78526 C 284.99164 148.56613 285.99213 150.9815 285.99213 153.5 C 285.99213 158.74454 281.7406 162.99606 276.49606 162.99606 C 271.25153 162.99606 267 158.74454 267 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_30"> + <path d="M 292 153.5 C 292 148.25546 296.25153 144.00394 301.49606 144.00394 C 304.0146 144.00394 306.42993 145.00441 308.2108 146.78526 C 309.99164 148.56613 310.99213 150.9815 310.99213 153.5 C 310.99213 158.74454 306.7406 162.99606 301.49606 162.99606 C 296.25153 162.99606 292 158.74454 292 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_29"> + <path d="M 568.4803 153.5 C 568.4803 148.25546 572.7319 144.00394 577.9764 144.00394 C 580.4949 144.00394 582.9103 145.00441 584.6911 146.78526 C 586.472 148.56613 587.4724 150.9815 587.4724 153.5 C 587.4724 158.74454 583.2209 162.99606 577.9764 162.99606 C 572.7319 162.99606 568.4803 158.74454 568.4803 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_28"> + <path d="M 594.9764 153.5 C 594.9764 148.25546 599.2279 144.00394 604.4724 144.00394 C 606.991 144.00394 609.4063 145.00441 611.1872 146.78526 C 612.96804 148.56613 613.9685 150.9815 613.9685 153.5 C 613.9685 158.74454 609.717 162.99606 604.4724 162.99606 C 599.2279 162.99606 594.9764 158.74454 594.9764 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_27"> + <path d="M 618.4803 153.5 C 618.4803 148.25546 622.7319 144.00394 627.9764 144.00394 C 630.4949 144.00394 632.9103 145.00441 634.6911 146.78526 C 636.472 148.56613 637.4724 150.9815 637.4724 153.5 C 637.4724 158.74454 633.2209 162.99606 627.9764 162.99606 C 622.7319 162.99606 618.4803 158.74454 618.4803 153.5 Z" fill="#90f"/> + </g> + <g id="Graphic_26"> + <path d="M 477 153.5 C 477 148.25546 481.2515 144.00394 486.49606 144.00394 C 489.0146 144.00394 491.42993 145.00441 493.2108 146.78526 C 494.99164 148.56613 495.9921 150.9815 495.9921 153.5 C 495.9921 158.74454 491.7406 162.99606 486.49606 162.99606 C 481.2515 162.99606 477 158.74454 477 153.5 Z" fill="#90f"/> + </g> + </g> + <g id="Canvas_1: windows"> + <title>windows</title> + <g id="Graphic_38"> + <rect x="369.5" y="136.5" width="222.5" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_45"> + <rect x="471.5" y="214.00788" width="172" height="36" stroke="#a5a5a5" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_52"> + <text transform="translate(-43.33231 143.5)" fill="black"> + <tspan font-family="Helvetica" font-size="16" font-weight="700" fill="black" x="0" y="15">Tumbling count windows</tspan> + </text> + </g> + <g id="Graphic_53"> + <text transform="translate(-25.855746 209.49606)" fill="black"> + <tspan font-family="Helvetica" font-size="16" font-weight="700" fill="black" x="0" y="15">Sliding count windows</tspan> + </text> + </g> + <g id="Graphic_54"> + <text transform="translate(34.581754 343.49606)" fill="black"> + <tspan font-family="Helvetica" font-size="16" font-weight="700" fill="black" x="0" y="15">Global window</tspan> + </text> + </g> + <g id="Graphic_55"> + <rect x="205.5" y="270.99606" width="108.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_56"> + <rect x="369.5" y="270.99606" width="56" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_57"> + <rect x="471.5" y="270.99606" width="30" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_58"> + <rect x="560.9449" y="270.99606" width="83.05512" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_169"> + <path d="M 374.9764 85.99606 C 374.9764 80.75152 379.2279 76.5 384.47244 76.5 C 386.99097 76.5 389.4063 77.50047 391.18716 79.28132 C 392.968 81.06219 393.9685 83.47755 393.9685 85.99606 C 393.9685 91.2406 389.717 95.49212 384.47244 95.49212 C 379.2279 95.49212 374.9764 91.2406 374.9764 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_168"> + <path d="M 401.47244 85.99606 C 401.47244 80.75152 405.72397 76.5 410.9685 76.5 C 413.48703 76.5 415.90237 77.50047 417.6832 79.28132 C 419.4641 81.06219 420.46457 83.47755 420.46457 85.99606 C 420.46457 91.2406 416.21304 95.49212 410.9685 95.49212 C 405.72397 95.49212 401.47244 91.2406 401.47244 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_167"> + <path d="M 209 85.99606 C 209 80.75152 213.25153 76.5 218.49606 76.5 C 221.01457 76.5 223.42993 77.50047 225.2108 79.28132 C 226.99165 81.06219 227.99213 83.47755 227.99213 85.99606 C 227.99213 91.2406 223.7406 95.49212 218.49606 95.49212 C 213.25153 95.49212 209 91.2406 209 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_166"> + <path d="M 242 85.99606 C 242 80.75152 246.25153 76.5 251.49606 76.5 C 254.01457 76.5 256.42993 77.50047 258.2108 79.28132 C 259.99164 81.06219 260.99213 83.47755 260.99213 85.99606 C 260.99213 91.2406 256.7406 95.49212 251.49606 95.49212 C 246.25153 95.49212 242 91.2406 242 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_165"> + <path d="M 267 85.99606 C 267 80.75152 271.25153 76.5 276.49606 76.5 C 279.0146 76.5 281.42993 77.50047 283.2108 79.28132 C 284.99164 81.06219 285.99213 83.47755 285.99213 85.99606 C 285.99213 91.2406 281.7406 95.49212 276.49606 95.49212 C 271.25153 95.49212 267 91.2406 267 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_164"> + <path d="M 292 85.99606 C 292 80.75152 296.25153 76.5 301.49606 76.5 C 304.0146 76.5 306.42993 77.50047 308.2108 79.28132 C 309.99164 81.06219 310.99213 83.47755 310.99213 85.99606 C 310.99213 91.2406 306.7406 95.49212 301.49606 95.49212 C 296.25153 95.49212 292 91.2406 292 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_163"> + <path d="M 568.4803 85.99606 C 568.4803 80.75152 572.7319 76.5 577.9764 76.5 C 580.4949 76.5 582.9103 77.50047 584.6911 79.28132 C 586.472 81.06219 587.4724 83.47755 587.4724 85.99606 C 587.4724 91.2406 583.2209 95.49212 577.9764 95.49212 C 572.7319 95.49212 568.4803 91.2406 568.4803 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_162"> + <path d="M 594.9764 85.99606 C 594.9764 80.75152 599.2279 76.5 604.4724 76.5 C 606.991 76.5 609.4063 77.50047 611.1872 79.28132 C 612.96804 81.06219 613.9685 83.47755 613.9685 85.99606 C 613.9685 91.2406 609.717 95.49212 604.4724 95.49212 C 599.2279 95.49212 594.9764 91.2406 594.9764 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_161"> + <path d="M 618.4803 85.99606 C 618.4803 80.75152 622.7319 76.5 627.9764 76.5 C 630.4949 76.5 632.9103 77.50047 634.6911 79.28132 C 636.472 81.06219 637.4724 83.47755 637.4724 85.99606 C 637.4724 91.2406 633.2209 95.49212 627.9764 95.49212 C 622.7319 95.49212 618.4803 91.2406 618.4803 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_160"> + <path d="M 477 85.99606 C 477 80.75152 481.2515 76.5 486.49606 76.5 C 489.0146 76.5 491.42993 77.50047 493.2108 79.28132 C 494.99164 81.06219 495.9921 83.47755 495.9921 85.99606 C 495.9921 91.2406 491.7406 95.49212 486.49606 95.49212 C 481.2515 95.49212 477 91.2406 477 85.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_159"> + <path d="M 374.9764 20 C 374.9764 14.755462 379.2279 10.503937 384.47244 10.503937 C 386.99097 10.503937 389.4063 11.50441 391.18716 13.285263 C 392.968 15.066132 393.9685 17.481491 393.9685 20 C 393.9685 25.244538 389.717 29.496063 384.47244 29.496063 C 379.2279 29.496063 374.9764 25.244537 374.9764 20 Z" fill="#90f"/> + </g> + <g id="Graphic_158"> + <path d="M 401.47244 20 C 401.47244 14.755462 405.72397 10.503937 410.9685 10.503937 C 413.48703 10.503937 415.90237 11.50441 417.6832 13.285263 C 419.4641 15.066132 420.46457 17.481491 420.46457 20 C 420.46457 25.244538 416.21304 29.496063 410.9685 29.496063 C 405.72397 29.496063 401.47244 25.244537 401.47244 20 Z" fill="#90f"/> + </g> + <g id="Graphic_157"> + <path d="M 209 20 C 209 14.755462 213.25153 10.503937 218.49606 10.503937 C 221.01457 10.503937 223.42993 11.50441 225.2108 13.285263 C 226.99165 15.066132 227.99213 17.481491 227.99213 20 C 227.99213 25.244538 223.7406 29.496063 218.49606 29.496063 C 213.25153 29.496063 209 25.244537 209 20 Z" fill="#90f"/> + </g> + <g id="Graphic_156"> + <path d="M 242 20 C 242 14.755462 246.25153 10.503937 251.49606 10.503937 C 254.01457 10.503937 256.42993 11.50441 258.2108 13.285263 C 259.99164 15.066132 260.99213 17.481491 260.99213 20 C 260.99213 25.244538 256.7406 29.496063 251.49606 29.496063 C 246.25153 29.496063 242 25.244537 242 20 Z" fill="#90f"/> + </g> + <g id="Graphic_155"> + <path d="M 267 20 C 267 14.755462 271.25153 10.503937 276.49606 10.503937 C 279.0146 10.503937 281.42993 11.50441 283.2108 13.285263 C 284.99164 15.066132 285.99213 17.481491 285.99213 20 C 285.99213 25.244538 281.7406 29.496063 276.49606 29.496063 C 271.25153 29.496063 267 25.244537 267 20 Z" fill="#90f"/> + </g> + <g id="Graphic_154"> + <path d="M 292 20 C 292 14.755462 296.25153 10.503937 301.49606 10.503937 C 304.0146 10.503937 306.42993 11.50441 308.2108 13.285263 C 309.99164 15.066132 310.99213 17.481491 310.99213 20 C 310.99213 25.244538 306.7406 29.496063 301.49606 29.496063 C 296.25153 29.496063 292 25.244537 292 20 Z" fill="#90f"/> + </g> + <g id="Graphic_153"> + <path d="M 568.4803 20 C 568.4803 14.755462 572.7319 10.503937 577.9764 10.503937 C 580.4949 10.503937 582.9103 11.50441 584.6911 13.285263 C 586.472 15.066132 587.4724 17.481491 587.4724 20 C 587.4724 25.244538 583.2209 29.496063 577.9764 29.496063 C 572.7319 29.496063 568.4803 25.244537 568.4803 20 Z" fill="#90f"/> + </g> + <g id="Graphic_152"> + <path d="M 594.9764 20 C 594.9764 14.755462 599.2279 10.503937 604.4724 10.503937 C 606.991 10.503937 609.4063 11.50441 611.1872 13.285263 C 612.96804 15.066132 613.9685 17.481491 613.9685 20 C 613.9685 25.244538 609.717 29.496063 604.4724 29.496063 C 599.2279 29.496063 594.9764 25.244537 594.9764 20 Z" fill="#90f"/> + </g> + <g id="Graphic_151"> + <path d="M 618.4803 20 C 618.4803 14.755462 622.7319 10.503937 627.9764 10.503937 C 630.4949 10.503937 632.9103 11.50441 634.6911 13.285263 C 636.472 15.066132 637.4724 17.481491 637.4724 20 C 637.4724 25.244538 633.2209 29.496063 627.9764 29.496063 C 622.7319 29.496063 618.4803 25.244537 618.4803 20 Z" fill="#90f"/> + </g> + <g id="Graphic_150"> + <path d="M 477 20 C 477 14.755462 481.2515 10.503937 486.49606 10.503937 C 489.0146 10.503937 491.42993 11.50441 493.2108 13.285263 C 494.99164 15.066132 495.9921 17.481491 495.9921 20 C 495.9921 25.244538 491.7406 29.496063 486.49606 29.496063 C 481.2515 29.496063 477 25.244537 477 20 Z" fill="#90f"/> + </g> + <g id="Graphic_149"> + <rect x="264.73622" y="2.5" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_148"> + <rect x="164.5" y="2.5" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_147"> + <rect x="364.72835" y="2.5" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_146"> + <rect x="464.7205" y="2.5" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_145"> + <rect x="564.7126" y="2.5" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_144"> + <rect x="264.73622" y="67.99606" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_143"> + <rect x="164.5" y="67.99606" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_142"> + <rect x="364.72835" y="67.99606" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_141"> + <rect x="464.7205" y="67.99606" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_140"> + <rect x="564.7126" y="67.99606" width="99.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_139"> + <rect x="314.73622" y="76.5" width="99.99213" height="36" stroke="#a5a5a5" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_138"> + <rect x="214.5" y="76.5" width="99.99213" height="36" stroke="#a5a5a5" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_137"> + <rect x="414.72835" y="76.5" width="99.99213" height="36" stroke="#a5a5a5" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_136"> + <rect x="514.7205" y="76.5" width="99.99213" height="36" stroke="#a5a5a5" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_135"> + <rect x="614.7126" y="76.5" width="99.99213" height="36" stroke="#a5a5a5" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_134"> + <text transform="translate(-32.68387 10)" fill="black"> + <tspan font-family="Helvetica" font-size="16" font-weight="700" fill="black" x="0" y="15">Tumbling time windows</tspan> + </text> + </g> + <g id="Graphic_133"> + <text transform="translate(-15.207309 75.99606)" fill="black"> + <tspan font-family="Helvetica" font-size="16" font-weight="700" fill="black" x="0" y="15">Sliding time windows</tspan> + </text> + </g> + <g id="Graphic_170"> + <rect x="206" y="136.5" width="108.99213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_171"> + <rect x="206" y="201.99606" width="108.49213" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_181"> + <path d="M 374.9764 353.99606 C 374.9764 348.75152 379.2279 344.5 384.47244 344.5 C 386.99097 344.5 389.4063 345.50047 391.18716 347.28132 C 392.968 349.0622 393.9685 351.47755 393.9685 353.99606 C 393.9685 359.2406 389.717 363.49212 384.47244 363.49212 C 379.2279 363.49212 374.9764 359.2406 374.9764 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_180"> + <path d="M 401.47244 353.99606 C 401.47244 348.75152 405.72397 344.5 410.9685 344.5 C 413.48703 344.5 415.90237 345.50047 417.6832 347.28132 C 419.4641 349.0622 420.46457 351.47755 420.46457 353.99606 C 420.46457 359.2406 416.21304 363.49212 410.9685 363.49212 C 405.72397 363.49212 401.47244 359.2406 401.47244 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_179"> + <path d="M 209 353.99606 C 209 348.75152 213.25153 344.5 218.49606 344.5 C 221.01457 344.5 223.42993 345.50047 225.2108 347.28132 C 226.99165 349.0622 227.99213 351.47755 227.99213 353.99606 C 227.99213 359.2406 223.7406 363.49212 218.49606 363.49212 C 213.25153 363.49212 209 359.2406 209 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_178"> + <path d="M 242 353.99606 C 242 348.75152 246.25153 344.5 251.49606 344.5 C 254.01457 344.5 256.42993 345.50047 258.2108 347.28132 C 259.99164 349.0622 260.99213 351.47755 260.99213 353.99606 C 260.99213 359.2406 256.7406 363.49212 251.49606 363.49212 C 246.25153 363.49212 242 359.2406 242 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_177"> + <path d="M 267 353.99606 C 267 348.75152 271.25153 344.5 276.49606 344.5 C 279.0146 344.5 281.42993 345.50047 283.2108 347.28132 C 284.99164 349.0622 285.99213 351.47755 285.99213 353.99606 C 285.99213 359.2406 281.7406 363.49212 276.49606 363.49212 C 271.25153 363.49212 267 359.2406 267 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_176"> + <path d="M 292 353.99606 C 292 348.75152 296.25153 344.5 301.49606 344.5 C 304.0146 344.5 306.42993 345.50047 308.2108 347.28132 C 309.99164 349.0622 310.99213 351.47755 310.99213 353.99606 C 310.99213 359.2406 306.7406 363.49212 301.49606 363.49212 C 296.25153 363.49212 292 359.2406 292 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_175"> + <path d="M 568.4803 353.99606 C 568.4803 348.75152 572.7319 344.5 577.9764 344.5 C 580.4949 344.5 582.9103 345.50047 584.6911 347.28132 C 586.472 349.0622 587.4724 351.47755 587.4724 353.99606 C 587.4724 359.2406 583.2209 363.49212 577.9764 363.49212 C 572.7319 363.49212 568.4803 359.2406 568.4803 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_174"> + <path d="M 594.9764 353.99606 C 594.9764 348.75152 599.2279 344.5 604.4724 344.5 C 606.991 344.5 609.4063 345.50047 611.1872 347.28132 C 612.96804 349.0622 613.9685 351.47755 613.9685 353.99606 C 613.9685 359.2406 609.717 363.49212 604.4724 363.49212 C 599.2279 363.49212 594.9764 359.2406 594.9764 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_173"> + <path d="M 618.4803 353.99606 C 618.4803 348.75152 622.7319 344.5 627.9764 344.5 C 630.4949 344.5 632.9103 345.50047 634.6911 347.28132 C 636.472 349.0622 637.4724 351.47755 637.4724 353.99606 C 637.4724 359.2406 633.2209 363.49212 627.9764 363.49212 C 622.7319 363.49212 618.4803 359.2406 618.4803 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_172"> + <path d="M 477 353.99606 C 477 348.75152 481.2515 344.5 486.49606 344.5 C 489.0146 344.5 491.42993 345.50047 493.2108 347.28132 C 494.99164 349.0622 495.9921 351.47755 495.9921 353.99606 C 495.9921 359.2406 491.7406 363.49212 486.49606 363.49212 C 481.2515 363.49212 477 359.2406 477 353.99606 Z" fill="#90f"/> + </g> + <g id="Graphic_182"> + <text transform="translate(13.660311 279.49606)" fill="black"> + <tspan font-family="Helvetica" font-size="16" font-weight="700" fill="black" x="0" y="15">Session windows</tspan> + </text> + </g> + <g id="Graphic_183"> + <rect x="368.99015" y="201.99606" width="223.00985" height="36" stroke="#666" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + <g id="Graphic_184"> + <rect x="263.97243" y="214.00788" width="162.02757" height="36" stroke="#a5a5a5" stroke-linecap="round" stroke-linejoin="round" stroke-width="1"/> + </g> + </g> + </g> +</svg> diff --git a/docs/tutorials/streaming_analytics.md b/docs/tutorials/streaming_analytics.md new file mode 100644 index 0000000..2e3bf13 --- /dev/null +++ b/docs/tutorials/streaming_analytics.md @@ -0,0 +1,475 @@ +--- +title: Streaming Analytics +nav-id: analytics +nav-pos: 4 +nav-title: Streaming Analytics +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Event Time and Watermarks + +### Introduction + +Flink explicitly supports three different notions of time: + +* _event time:_ the time when an event occurred, as recorded by the device producing (or storing) the event + +* _ingestion time:_ a timestamp recorded by Flink at the moment it ingests the event + +* _processing time:_ the time when a specific operator in your pipeline is processing the event + +For reproducible results, e.g., when computing the maximum price a stock reached during the first +hour of trading on a given day, you should use event time. In this way the result won't depend on +when the calculation is performed. This kind of real-time application is sometimes performed using +processing time, but then the results are determined by the events that happen to be processed +during that hour, rather than the events that occurred then. Computing analytics based on processing +time causes inconsistencies, and makes it difficult to re-analyze historic data or test new +implementations. + +### Working with Event Time + +By default, Flink will use processing time. To change this, you can set the Time Characteristic: + +{% highlight java %} +final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + +If you want to use event time, you will also need to supply a Timestamp Extractor and Watermark +Generator that Flink will use to track the progress of event time. This will be covered in the +section below on [Working with Watermarks]({{ site.baseurl }}{% link +tutorials/streaming_analytics.md %}#working-with-watermarks), but first we should explain what +watermarks are. + +### Watermarks + +Let's work through a simple example that will show why watermarks are needed, and how they work. + +In this example you have a stream of timestamped events that arrive somewhat out of order, as shown +below. The numbers shown are timestamps that indicate when these events actually occurred. The first +event to arrive happened at time 4, and it is followed by an event that happened earlier, at time 2, +and so on: + +<div class="text-center" style="font-size: x-large; word-spacing: 0.5em; margin: 1em 0em;"> +··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 → +</div> + +Now imagine that you are trying create a stream sorter. This is meant to be an application that +processes each event from a stream as it arrives, and emits a new stream containing the same events, +but ordered by their timestamps. + +Some observations: + +(1) The first element your stream sorter sees is the 4, but you can't just immediately release it as +the first element of the sorted stream. It may have arrived out of order, and an earlier event might +yet arrive. In fact, you have the benefit of some god-like knowledge of this stream's future, and +you can see that your stream sorter should wait at least until the 2 arrives before producing any +results. + +*Some buffering, and some delay, is necessary.* + +(2) If you do this wrong, you could end up waiting forever. First the sorter saw an event from time +4, and then an event from time 2. Will an event with a timestamp less than 2 ever arrive? Maybe. +Maybe not. You could wait forever and never see a 1. + +*Eventually you have to be courageous and emit the 2 as the start of the sorted stream.* + +(3) What you need then is some sort of policy that defines when, for any given timestamped event, to +stop waiting for the arrival of earlier events. + +*This is precisely what watermarks do* — they define when to stop waiting for earlier events. + +Event time processing in Flink depends on *watermark generators* that insert special timestamped +elements into the stream, called *watermarks*. A watermark for time _t_ is an assertion that the +stream is (probably) now complete up through time _t_. + +When should this stream sorter stop waiting, and push out the 2 to start the sorted stream? When a +watermark arrives with a timestamp of 2, or greater. + +(4) You might imagine different policies for deciding how to generate watermarks. + +Each event arrives after some delay, and these delays vary, so some events are delayed more than +others. One simple approach is to assume that these delays are bounded by some maximum delay. Flink +refers to this strategy as *bounded-out-of-orderness* watermarking. It is easy to imagine more +complex approaches to watermarking, but for most applications a fixed delay works well enough. + +### Latency vs. Completeness + +Another way to think about watermarks is that they give you, the developer of a streaming +application, control over the tradeoff between latency and completeness. Unlike in batch processing, +where one has the luxury of being able to have complete knowledge of the input before producing any +results, with streaming you must eventually stop waiting to see more of the input, and produce some +sort of result. + +You can either configure your watermarking aggressively, with a short bounded delay, and thereby +take the risk of producing results with rather incomplete knowledge of the input -- i.e., a possibly +wrong result, produced quickly. Or you can wait longer, and produce results that take advantage of +having more complete knowledge of the input stream(s). + +It is also possible to implement hybrid solutions that produce initial results quickly, and then +supply updates to those results as additional (late) data is processed. This is a good approach for +some applications. + +### Lateness + +Lateness is defined relative to the watermarks. A `Watermark(t)` asserts that the stream is complete +up through time _t_; any event following this watermark whose timestamp is ≤ _t_ is late. + +### Working with Watermarks + +In order to perform event-time-based event processing, Flink needs to know the time associated with +each event, and it also needs the stream to include watermarks. + +The Taxi data sources used in the hands-on exercises take care of these details for you. But in your +own applications you will have to take care of this yourself, which is usually done by implementing +a class that extracts the timestamps from the events, and generates watermarks on demand. The +easiest way to do this is by extending the `BoundedOutOfOrdernessTimestampExtractor`: + +{% highlight java %} +DataStream<Event> stream = ... + +DataStream<Event> withTimestampsAndWatermarks = + stream.assignTimestampsAndWatermarks(new TimestampsAndWatermarks(Time.seconds(10))); + +public static class TimestampsAndWatermarks + extends BoundedOutOfOrdernessTimestampExtractor<Event> { + + public TimestampsAndWatermarks(Time t) { + super(t); + } + + @Override + public long extractTimestamp(Event event) { + return event.timestamp; + } +} +{% endhighlight %} + +Note that the constructor for `BoundedOutOfOrdernessTimestampExtractor` takes a parameter which +specifies the maximum expected out-of-orderness (10 seconds, in this example). + +{% top %} + +## Windows + +Flink features very expressive window semantics. + +In this section you will learn: + +* how windows are used to compute aggregates on unbounded streams, +* which types of windows Flink supports, and +* how to implement a DataStream program with a windowed aggregation + +### Introduction + +It is natural when doing stream processing to want to compute aggregated analytics on bounded subsets +of the streams in order to answer questions like these: + +* number of page views per minute +* number of sessions per user per week +* maximum temperature per sensor per minute + +Computing windowed analytics with Flink depends on two principal abstractions: _Window Assigners_ +that assign events to windows (creating new window objects as necessary), and _Window Functions_ +that are applied to the events assigned to a window. + +Flink's windowing API also has notions of _Triggers_, which determine when to call the window +function, and _Evictors_, which can remove elements collected in a window. + +In its basic form, you apply windowing to a keyed stream like this: + +{% highlight java %} +stream. + .keyBy(<key selector>) + .window(<window assigner>) + .reduce|aggregate|process(<window function>) +{% endhighlight %} + +You can also use windowing with non-keyed streams, but keep in mind that in this case, the +processing will _not_ be done in parallel: + +{% highlight java %} +stream. + .windowAll(<window assigner>) + .reduce|aggregate|process(<window function>) +{% endhighlight %} + +### Window Assigners + +Flink has several built-in types of window assigners, which are illustrated below: + +<img src="{{ site.baseurl }}/fig/window-assigners.svg" alt="Window assigners" class="center" width="80%" /> + +Some examples of what these window assigners might be used for, and how to specify them: + +* Tumbling time windows + * _page views per minute_ + * `TumblingEventTimeWindows.of(Time.minutes(1))` +* Sliding time windows + * _page views per minute computed every 10 seconds_ + * `SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))` +* Session windows + * _page views per session, where sessions are defined by a gap of at least 30 minutes between sessions_ + * `EventTimeSessionWindows.withGap(Time.minutes(30))` + +Durations can be specified using one of `Time.milliseconds(n)`, `Time.seconds(n)`, `Time.minutes(n)`, `Time.hours(n)`, and `Time.days(n)`. + +The time-based window assigners (including session windows) come in both event time and processing +time flavors. There are significant tradeoffs between these two types of time windows. With +processing time windowing you have to accept these limitations: + +* can not correctly process historic data, +* can not correctly handle out-of-order data, +* results will be non-deterministic, + +but with the advantage of lower latency. + +When working with count-based windows, keep in mind that these windows will not fire until a batch +is complete. There's no option to time-out and process a partial window, though you could implement +that behavior yourself with a custom Trigger. + +A global window assigner assigns every event (with the same key) to the same global window. This is +only useful if you are going to do your own custom windowing, with a custom Trigger. In many cases +where this might seem useful you will be better off using a `ProcessFunction` as described +[in another section]({{ site.baseurl }}{% link tutorials/event_driven.md %}#process-functions). + +### Window Functions + +You have three basic options for how to process the contents of your windows: + +1. as a batch, using a `ProcessWindowFunction` that will be passed an `Iterable` with the window's contents; +1. incrementally, with a `ReduceFunction` or an `AggregateFunction` that is called as each event is assigned to the window; +1. or with a combination of the two, wherein the pre-aggregated results of a `ReduceFunction` or an `AggregateFunction` are supplied to a `ProcessWindowFunction` when the window is triggered. + +Here are examples of approaches 1 and 3. Each implementation finds the peak value from each sensor +in 1 minute event time windows, and producing a stream of Tuples containing `(key, +end-of-window-timestamp, max_value)`. + +#### ProcessWindowFunction Example + +{% highlight java %} +DataStream<SensorReading> input = ... + +input + .keyBy(x -> x.key) + .window(TumblingEventTimeWindows.of(Time.minutes(1))) + .process(new MyWastefulMax()); + +public static class MyWastefulMax extends ProcessWindowFunction< + SensorReading, // input type + Tuple3<String, Long, Integer>, // output type + String, // key type + TimeWindow> { // window type + + @Override + public void process( + String key, + Context context, + Iterable<SensorReading> events, + Collector<Tuple3<String, Long, Integer>> out) { + + int max = 0; + for (SensorReading event : events) { + max = Math.max(event.value, max); + } + out.collect(Tuple3.of(key, context.window().getEnd(), max)); + } +} +{% endhighlight %} + +A couple of things to note in this implementation: + +* All of the events assigned to the window have to be buffered in keyed Flink state until the window + is triggered. This is potentially quite expensive. +* Our `ProcessWindowFunction` is being passed a `Context` object from which contains information about + the window. Its interface looks like this: + +{% highlight java %} +public abstract class Context implements java.io.Serializable { + public abstract W window(); + + public abstract long currentProcessingTime(); + public abstract long currentWatermark(); + + public abstract KeyedStateStore windowState(); + public abstract KeyedStateStore globalState(); +} +{% endhighlight %} + +`windowState` and `globalState` are places where you can store per-key, per-window, or global +per-key information for all windows of that key. This might be useful, for example, if you want to +record something about the +current window and use that when processing a subsequent window. + +#### Incremental Aggregation Example + +{% highlight java %} +DataStream<SensorReading> input = ... + +input + .keyBy(x -> x.key) + .window(TumblingEventTimeWindows.of(Time.minutes(1))) + .reduce(new MyReducingMax(), new MyWindowFunction()); + +private static class MyReducingMax implements ReduceFunction<SensorReading> { + public SensorReading reduce(SensorReading r1, SensorReading r2) { + return r1.value() > r2.value() ? r1 : r2; + } +} + +private static class MyWindowFunction extends ProcessWindowFunction< + SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> { + + @Override + public void process( + String key, + Context context, + Iterable<SensorReading> maxReading, + Collector<Tuple3<String, Long, SensorReading>> out) { + + SensorReading max = maxReading.iterator().next(); + out.collect(Tuple3.of(key, context.window().getEnd(), max)); + } +} +{% endhighlight %} + +Notice that the `Iterable<SensorReading>` +will contain exactly one reading -- the pre-aggregated maximum computed by `MyReducingMax`. + +### Late Events + +By default, when using event time windows, late events are dropped. There are two optional parts of +the window API that give you more control over this. + +You can arrange for the events that would be dropped to be collected to an alternate output stream +instead, using a mechanism called +[Side Outputs]({{ site.baseurl }}{% link tutorials/event_driven.md %}#side-outputs). +Here is an example of what that might look like: + +{% highlight java %} +OutputTag<Event> lateTag = new OutputTag<Event>("late"){}; + +SingleOutputStreamOperator<Event> result = stream. + .keyBy(...) + .window(...) + .sideOutputLateData(lateTag) + .process(...); + +DataStream<Event> lateStream = result.getSideOutput(lateTag); +{% endhighlight %} + +You can also specify an interval of _allowed lateness_ during which the late events will continue to +be assigned to the appropriate window(s) (whose state will have been retained). By default each late +event will cause the window function to be called again (sometimes called a _late firing_). + +By default the allowed lateness is 0. In other words, elements behind the watermark are dropped (or +sent to the side output). + +For example: + +{% highlight java %} +stream. + .keyBy(...) + .window(...) + .allowedLateness(Time.seconds(10)) + .process(...); +{% endhighlight %} + +When the allowed lateness is greater than zero, only those events that are so late that they would +be dropped are sent to the side output (if it has been configured). + +### Surprises + +Some aspects of Flink's windowing API may not behave in the way you would expect. Based on +frequently asked questions on the [flink-user mailing +list](https://flink.apache.org/community.html#mailing-lists) and elsewhere, here are some facts +about windows that may surprise you. + +#### Sliding Windows Make Copies + +Sliding window assigners can create lots of window objects, and will copy each event into every +relevant window. For example, if you have sliding windows every 15 minutes that are 24-hours in +length, each event will be copied into 4 * 24 = 96 windows. + +#### Time Windows are Aligned to the Epoch + +Just because you are using hour-long processing-time windows and start your application running at +12:05 does not mean that the first window will close at 1:05. The first window will be 55 minutes +long and close at 1:00. + +Note, however, that the tumbling and sliding window assigners take an optional offset parameter +that can be used to change the alignment of the windows. See +[Tumbling Windows]({{ site.baseurl }}{% link dev/stream/operators/windows.md %}#tumbling-windows) and +[Sliding Windows]({{ site.baseurl }}{% link dev/stream/operators/windows.md %}#sliding-windows) for details. + +#### Windows Can Follow Windows + +For example, it works to do this: + +{% highlight java %} +stream + .keyBy(t -> t.key) + .timeWindow(<time specification>) + .reduce(<reduce function>) + .timeWindowAll(<same time specification>) + .reduce(<same reduce function>) +{% endhighlight %} + +You might expect Flink's runtime to be smart enough to do this parallel pre-aggregation for you +(provided you are using a ReduceFunction or AggregateFunction), but it's not. + +The reason why this works is that the events produced by a time window are assigned timestamps +based on the time at the end of the window. So, for example, all of the events produced +by an hour-long window will have timestamps marking the end of an hour. Any subsequent window +consuming those events should have a duration that is the same as, or a multiple of, the +previous window. + +#### No Results for Empty TimeWindows + +Windows are only created when events are assigned to them. So if there are no events in a given time +frame, no results will be reported. + +#### Late Events Can Cause Late Merges + +Session windows are based on an abstraction of windows that can _merge_. Each element is initially +assigned to a new window, after which windows are merged whenever the gap between them is small +enough. In this way, a late event can bridge the gap separating two previously separate sessions, +producing a late merge. + +{% top %} + +## Hands-on + +The hands-on exercise that goes with this section is the [Hourly Tips +Exercise](https://github.com/apache/flink-training/tree/{% if site.is_stable %}release-{{ site.version_title }}{% else %}master{% endif %}/hourly-tips). + +{% top %} + +## Further Reading + +- [Timely Stream Processing]({{ site.baseurl }}{% link concepts/timely-stream-processing.md %}) +- [Windows]({{ site.baseurl }}{% link dev/stream/operators/windows.md %}) + +{% top %} diff --git a/docs/tutorials/streaming_analytics.zh.md b/docs/tutorials/streaming_analytics.zh.md new file mode 100644 index 0000000..b0beb91 --- /dev/null +++ b/docs/tutorials/streaming_analytics.zh.md @@ -0,0 +1,475 @@ +--- +title: Streaming Analytics +nav-id: analytics +nav-pos: 4 +nav-title: Streaming Analytics +nav-parent_id: tutorials +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +## Event Time and Watermarks + +### Introduction + +Flink explicitly supports three different notions of time: + +* _event time:_ the time when an event occurred, as recorded by the device producing (or storing) the event + +* _ingestion time:_ a timestamp recorded by Flink at the moment it ingests the event + +* _processing time:_ the time when a specific operator in your pipeline is processing the event + +For reproducible results, e.g., when computing the maximum price a stock reached during the first +hour of trading on a given day, you should use event time. In this way the result won't depend on +when the calculation is performed. This kind of real-time application is sometimes performed using +processing time, but then the results are determined by the events that happen to be processed +during that hour, rather than the events that occurred then. Computing analytics based on processing +time causes inconsistencies, and makes it difficult to re-analyze historic data or test new +implementations. + +### Working with Event Time + +By default, Flink will use processing time. To change this, you can set the Time Characteristic: + +{% highlight java %} +final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + +If you want to use event time, you will also need to supply a Timestamp Extractor and Watermark +Generator that Flink will use to track the progress of event time. This will be covered in the +section below on [Working with Watermarks]({{ site.baseurl }}{% link +tutorials/streaming_analytics.zh.md %}#working-with-watermarks), but first we should explain what +watermarks are. + +### Watermarks + +Let's work through a simple example that will show why watermarks are needed, and how they work. + +In this example you have a stream of timestamped events that arrive somewhat out of order, as shown +below. The numbers shown are timestamps that indicate when these events actually occurred. The first +event to arrive happened at time 4, and it is followed by an event that happened earlier, at time 2, +and so on: + +<div class="text-center" style="font-size: x-large; word-spacing: 0.5em; margin: 1em 0em;"> +··· 23 19 22 24 21 14 17 13 12 15 9 11 7 2 4 → +</div> + +Now imagine that you are trying create a stream sorter. This is meant to be an application that +processes each event from a stream as it arrives, and emits a new stream containing the same events, +but ordered by their timestamps. + +Some observations: + +(1) The first element your stream sorter sees is the 4, but you can't just immediately release it as +the first element of the sorted stream. It may have arrived out of order, and an earlier event might +yet arrive. In fact, you have the benefit of some god-like knowledge of this stream's future, and +you can see that your stream sorter should wait at least until the 2 arrives before producing any +results. + +*Some buffering, and some delay, is necessary.* + +(2) If you do this wrong, you could end up waiting forever. First the sorter saw an event from time +4, and then an event from time 2. Will an event with a timestamp less than 2 ever arrive? Maybe. +Maybe not. You could wait forever and never see a 1. + +*Eventually you have to be courageous and emit the 2 as the start of the sorted stream.* + +(3) What you need then is some sort of policy that defines when, for any given timestamped event, to +stop waiting for the arrival of earlier events. + +*This is precisely what watermarks do* — they define when to stop waiting for earlier events. + +Event time processing in Flink depends on *watermark generators* that insert special timestamped +elements into the stream, called *watermarks*. A watermark for time _t_ is an assertion that the +stream is (probably) now complete up through time _t_. + +When should this stream sorter stop waiting, and push out the 2 to start the sorted stream? When a +watermark arrives with a timestamp of 2, or greater. + +(4) You might imagine different policies for deciding how to generate watermarks. + +Each event arrives after some delay, and these delays vary, so some events are delayed more than +others. One simple approach is to assume that these delays are bounded by some maximum delay. Flink +refers to this strategy as *bounded-out-of-orderness* watermarking. It is easy to imagine more +complex approaches to watermarking, but for most applications a fixed delay works well enough. + +### Latency vs. Completeness + +Another way to think about watermarks is that they give you, the developer of a streaming +application, control over the tradeoff between latency and completeness. Unlike in batch processing, +where one has the luxury of being able to have complete knowledge of the input before producing any +results, with streaming you must eventually stop waiting to see more of the input, and produce some +sort of result. + +You can either configure your watermarking aggressively, with a short bounded delay, and thereby +take the risk of producing results with rather incomplete knowledge of the input -- i.e., a possibly +wrong result, produced quickly. Or you can wait longer, and produce results that take advantage of +having more complete knowledge of the input stream(s). + +It is also possible to implement hybrid solutions that produce initial results quickly, and then +supply updates to those results as additional (late) data is processed. This is a good approach for +some applications. + +### Lateness + +Lateness is defined relative to the watermarks. A `Watermark(t)` asserts that the stream is complete +up through time _t_; any event following this watermark whose timestamp is ≤ _t_ is late. + +### Working with Watermarks + +In order to perform event-time-based event processing, Flink needs to know the time associated with +each event, and it also needs the stream to include watermarks. + +The Taxi data sources used in the hands-on exercises take care of these details for you. But in your +own applications you will have to take care of this yourself, which is usually done by implementing +a class that extracts the timestamps from the events, and generates watermarks on demand. The +easiest way to do this is by extending the `BoundedOutOfOrdernessTimestampExtractor`: + +{% highlight java %} +DataStream<Event> stream = ... + +DataStream<Event> withTimestampsAndWatermarks = + stream.assignTimestampsAndWatermarks(new TimestampsAndWatermarks(Time.seconds(10))); + +public static class TimestampsAndWatermarks + extends BoundedOutOfOrdernessTimestampExtractor<Event> { + + public TimestampsAndWatermarks(Time t) { + super(t); + } + + @Override + public long extractTimestamp(Event event) { + return event.timestamp; + } +} +{% endhighlight %} + +Note that the constructor for `BoundedOutOfOrdernessTimestampExtractor` takes a parameter which +specifies the maximum expected out-of-orderness (10 seconds, in this example). + +{% top %} + +## Windows + +Flink features very expressive window semantics. + +In this section you will learn: + +* how windows are used to compute aggregates on unbounded streams, +* which types of windows Flink supports, and +* how to implement a DataStream program with a windowed aggregation + +### Introduction + +It is natural when doing stream processing to want to compute aggregated analytics on bounded subsets +of the streams in order to answer questions like these: + +* number of page views per minute +* number of sessions per user per week +* maximum temperature per sensor per minute + +Computing windowed analytics with Flink depends on two principal abstractions: _Window Assigners_ +that assign events to windows (creating new window objects as necessary), and _Window Functions_ +that are applied to the events assigned to a window. + +Flink's windowing API also has notions of _Triggers_, which determine when to call the window +function, and _Evictors_, which can remove elements collected in a window. + +In its basic form, you apply windowing to a keyed stream like this: + +{% highlight java %} +stream. + .keyBy(<key selector>) + .window(<window assigner>) + .reduce|aggregate|process(<window function>) +{% endhighlight %} + +You can also use windowing with non-keyed streams, but keep in mind that in this case, the +processing will _not_ be done in parallel: + +{% highlight java %} +stream. + .windowAll(<window assigner>) + .reduce|aggregate|process(<window function>) +{% endhighlight %} + +### Window Assigners + +Flink has several built-in types of window assigners, which are illustrated below: + +<img src="{{ site.baseurl }}/fig/window-assigners.svg" alt="Window assigners" class="center" width="80%" /> + +Some examples of what these window assigners might be used for, and how to specify them: + +* Tumbling time windows + * _page views per minute_ + * `TumblingEventTimeWindows.of(Time.minutes(1))` +* Sliding time windows + * _page views per minute computed every 10 seconds_ + * `SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))` +* Session windows + * _page views per session, where sessions are defined by a gap of at least 30 minutes between sessions_ + * `EventTimeSessionWindows.withGap(Time.minutes(30))` + +Durations can be specified using one of `Time.milliseconds(n)`, `Time.seconds(n)`, `Time.minutes(n)`, `Time.hours(n)`, and `Time.days(n)`. + +The time-based window assigners (including session windows) come in both event time and processing +time flavors. There are significant tradeoffs between these two types of time windows. With +processing time windowing you have to accept these limitations: + +* can not correctly process historic data, +* can not correctly handle out-of-order data, +* results will be non-deterministic, + +but with the advantage of lower latency. + +When working with count-based windows, keep in mind that these windows will not fire until a batch +is complete. There's no option to time-out and process a partial window, though you could implement +that behavior yourself with a custom Trigger. + +A global window assigner assigns every event (with the same key) to the same global window. This is +only useful if you are going to do your own custom windowing, with a custom Trigger. In many cases +where this might seem useful you will be better off using a `ProcessFunction` as described +[in another section]({{ site.baseurl }}{% link tutorials/event_driven.zh.md %}#process-functions). + +### Window Functions + +You have three basic options for how to process the contents of your windows: + +1. as a batch, using a `ProcessWindowFunction` that will be passed an `Iterable` with the window's contents; +1. incrementally, with a `ReduceFunction` or an `AggregateFunction` that is called as each event is assigned to the window; +1. or with a combination of the two, wherein the pre-aggregated results of a `ReduceFunction` or an `AggregateFunction` are supplied to a `ProcessWindowFunction` when the window is triggered. + +Here are examples of approaches 1 and 3. Each implementation finds the peak value from each sensor +in 1 minute event time windows, and producing a stream of Tuples containing `(key, +end-of-window-timestamp, max_value)`. + +#### ProcessWindowFunction Example + +{% highlight java %} +DataStream<SensorReading> input = ... + +input + .keyBy(x -> x.key) + .window(TumblingEventTimeWindows.of(Time.minutes(1))) + .process(new MyWastefulMax()); + +public static class MyWastefulMax extends ProcessWindowFunction< + SensorReading, // input type + Tuple3<String, Long, Integer>, // output type + String, // key type + TimeWindow> { // window type + + @Override + public void process( + String key, + Context context, + Iterable<SensorReading> events, + Collector<Tuple3<String, Long, Integer>> out) { + + int max = 0; + for (SensorReading event : events) { + max = Math.max(event.value, max); + } + out.collect(Tuple3.of(key, context.window().getEnd(), max)); + } +} +{% endhighlight %} + +A couple of things to note in this implementation: + +* All of the events assigned to the window have to be buffered in keyed Flink state until the window + is triggered. This is potentially quite expensive. +* Our `ProcessWindowFunction` is being passed a `Context` object from which contains information about + the window. Its interface looks like this: + +{% highlight java %} +public abstract class Context implements java.io.Serializable { + public abstract W window(); + + public abstract long currentProcessingTime(); + public abstract long currentWatermark(); + + public abstract KeyedStateStore windowState(); + public abstract KeyedStateStore globalState(); +} +{% endhighlight %} + +`windowState` and `globalState` are places where you can store per-key, per-window, or global +per-key information for all windows of that key. This might be useful, for example, if you want to +record something about the +current window and use that when processing a subsequent window. + +#### Incremental Aggregation Example + +{% highlight java %} +DataStream<SensorReading> input = ... + +input + .keyBy(x -> x.key) + .window(TumblingEventTimeWindows.of(Time.minutes(1))) + .reduce(new MyReducingMax(), new MyWindowFunction()); + +private static class MyReducingMax implements ReduceFunction<SensorReading> { + public SensorReading reduce(SensorReading r1, SensorReading r2) { + return r1.value() > r2.value() ? r1 : r2; + } +} + +private static class MyWindowFunction extends ProcessWindowFunction< + SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> { + + @Override + public void process( + String key, + Context context, + Iterable<SensorReading> maxReading, + Collector<Tuple3<String, Long, SensorReading>> out) { + + SensorReading max = maxReading.iterator().next(); + out.collect(Tuple3.of(key, context.window().getEnd(), max)); + } +} +{% endhighlight %} + +Notice that the `Iterable<SensorReading>` +will contain exactly one reading -- the pre-aggregated maximum computed by `MyReducingMax`. + +### Late Events + +By default, when using event time windows, late events are dropped. There are two optional parts of +the window API that give you more control over this. + +You can arrange for the events that would be dropped to be collected to an alternate output stream +instead, using a mechanism called +[Side Outputs]({{ site.baseurl }}{% link tutorials/event_driven.zh.md %}#side-outputs). +Here is an example of what that might look like: + +{% highlight java %} +OutputTag<Event> lateTag = new OutputTag<Event>("late"){}; + +SingleOutputStreamOperator<Event> result = stream. + .keyBy(...) + .window(...) + .sideOutputLateData(lateTag) + .process(...); + +DataStream<Event> lateStream = result.getSideOutput(lateTag); +{% endhighlight %} + +You can also specify an interval of _allowed lateness_ during which the late events will continue to +be assigned to the appropriate window(s) (whose state will have been retained). By default each late +event will cause the window function to be called again (sometimes called a _late firing_). + +By default the allowed lateness is 0. In other words, elements behind the watermark are dropped (or +sent to the side output). + +For example: + +{% highlight java %} +stream. + .keyBy(...) + .window(...) + .allowedLateness(Time.seconds(10)) + .process(...); +{% endhighlight %} + +When the allowed lateness is greater than zero, only those events that are so late that they would +be dropped are sent to the side output (if it has been configured). + +### Surprises + +Some aspects of Flink's windowing API may not behave in the way you would expect. Based on +frequently asked questions on the [flink-user mailing +list](https://flink.apache.org/community.html#mailing-lists) and elsewhere, here are some facts +about windows that may surprise you. + +#### Sliding Windows Make Copies + +Sliding window assigners can create lots of window objects, and will copy each event into every +relevant window. For example, if you have sliding windows every 15 minutes that are 24-hours in +length, each event will be copied into 4 * 24 = 96 windows. + +#### Time Windows are Aligned to the Epoch + +Just because you are using hour-long processing-time windows and start your application running at +12:05 does not mean that the first window will close at 1:05. The first window will be 55 minutes +long and close at 1:00. + +Note, however, that the tumbling and sliding window assigners take an optional offset parameter +that can be used to change the alignment of the windows. See +[Tumbling Windows]({{ site.baseurl }}{% link dev/stream/operators/windows.zh.md %}#tumbling-windows) and +[Sliding Windows]({{ site.baseurl }}{% link dev/stream/operators/windows.zh.md %}#sliding-windows) for details. + +#### Windows Can Follow Windows + +For example, it works to do this: + +{% highlight java %} +stream + .keyBy(t -> t.key) + .timeWindow(<time specification>) + .reduce(<reduce function>) + .timeWindowAll(<same time specification>) + .reduce(<same reduce function>) +{% endhighlight %} + +You might expect Flink's runtime to be smart enough to do this parallel pre-aggregation for you +(provided you are using a ReduceFunction or AggregateFunction), but it's not. + +The reason why this works is that the events produced by a time window are assigned timestamps +based on the time at the end of the window. So, for example, all of the events produced +by an hour-long window will have timestamps marking the end of an hour. Any subsequent window +consuming those events should have a duration that is the same as, or a multiple of, the +previous window. + +#### No Results for Empty TimeWindows + +Windows are only created when events are assigned to them. So if there are no events in a given time +frame, no results will be reported. + +#### Late Events Can Cause Late Merges + +Session windows are based on an abstraction of windows that can _merge_. Each element is initially +assigned to a new window, after which windows are merged whenever the gap between them is small +enough. In this way, a late event can bridge the gap separating two previously separate sessions, +producing a late merge. + +{% top %} + +## Hands-on + +The hands-on exercise that goes with this section is the [Hourly Tips +Exercise](https://github.com/apache/flink-training/tree/{% if site.is_stable %}release-{{ site.version_title }}{% else %}master{% endif %}/hourly-tips). + +{% top %} + +## Further Reading + +- [Timely Stream Processing]({{ site.baseurl }}{% link concepts/timely-stream-processing.zh.md %}) +- [Windows]({{ site.baseurl }}{% link dev/stream/operators/windows.zh.md %}) + +{% top %}