This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 50dcd81  [FLINK-22350][python][docs] Add documentation for 
user-defined window in Python DataStream API
50dcd81 is described below

commit 50dcd819b0a82b307f85b3b6dbb89a7d6600331e
Author: huangxingbo <hxbks...@gmail.com>
AuthorDate: Wed Apr 21 17:55:27 2021 +0800

    [FLINK-22350][python][docs] Add documentation for user-defined window in 
Python DataStream API
    
    This closes #15702.
---
 .../docs/dev/python/datastream/operators/_index.md |  23 +
 .../{operators.md => operators/overview.md}        |   6 +-
 .../dev/python/datastream/operators/windows.md     | 585 +++++++++++++++++++++
 .../docs/dev/python/datastream/operators/_index.md |  23 +
 .../{operators.md => operators/overview.md}        |   4 +-
 .../dev/python/datastream/operators/windows.md     | 585 +++++++++++++++++++++
 6 files changed, 1221 insertions(+), 5 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/datastream/operators/_index.md 
b/docs/content.zh/docs/dev/python/datastream/operators/_index.md
new file mode 100644
index 0000000..574fa3e
--- /dev/null
+++ b/docs/content.zh/docs/dev/python/datastream/operators/_index.md
@@ -0,0 +1,23 @@
+---
+title: Operators
+bookCollapseSection: true
+weight: 21
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content.zh/docs/dev/python/datastream/operators.md 
b/docs/content.zh/docs/dev/python/datastream/operators/overview.md
similarity index 97%
rename from docs/content.zh/docs/dev/python/datastream/operators.md
rename to docs/content.zh/docs/dev/python/datastream/operators/overview.md
index 78eba1a..8e1ac66 100644
--- a/docs/content.zh/docs/dev/python/datastream/operators.md
+++ b/docs/content.zh/docs/dev/python/datastream/operators/overview.md
@@ -1,9 +1,9 @@
 ---
-title: "Operators"
-weight: 21
+title: "Overview"
+weight: 1
 type: docs
 aliases:
-  - /zh/dev/python/datastream-api-users-guide/operators.html
+    - /zh/dev/python/datastream-api-users-guide/operators.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content.zh/docs/dev/python/datastream/operators/windows.md 
b/docs/content.zh/docs/dev/python/datastream/operators/windows.md
new file mode 100644
index 0000000..a79d7c7
--- /dev/null
+++ b/docs/content.zh/docs/dev/python/datastream/operators/windows.md
@@ -0,0 +1,585 @@
+---
+title: "Windows"
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Windows
+
+Windows are at the heart of processing infinite streams. Windows split the 
stream into "buckets" of finite size,
+over which we can apply computations. This document focuses on how windowing 
is performed in Flink and how the
+programmer can benefit to the maximum from its offered functionality.
+
+Currently, the widow operation is only supported in *keyed* streams
+
+**Keyed Windows**
+
+    stream
+           .key_by(...)
+           .window(...)              <-  required: "assigner"
+          [.trigger(...)]            <-  optional: "trigger" (else default 
trigger)
+          [.allowed_lateness(...)]   <-  optional: "lateness" (else zero)
+           .apply/process()          <-  required: "function"
+
+In the above, the commands in square brackets ([...]) are optional. This 
reveals that Flink allows you to customize your
+windowing logic in many different ways so that it best fits your needs.
+
+## Window Lifecycle
+
+In a nutshell, a window is **created** as soon as the first element that 
should belong to this window arrives, and the
+window is **completely removed** when the time (event or processing time) 
passes its end timestamp plus the user-specified
+`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink 
guarantees removal only for time-based
+windows and not for other types, *e.g.* global windows (see [Window 
Assigners](#window-assigners)). For example, with an
+event-time-based windowing strategy that creates non-overlapping (or tumbling) 
windows every 5 minutes and has an allowed
+lateness of 1 min, Flink will create a new window for the interval between 
`12:00` and `12:05` when the first element with
+a timestamp that falls into this interval arrives, and it will remove it when 
the watermark passes the `12:06`
+timestamp.
+
+In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and 
a function (`WindowFunction` or `ProcessWindowFunction`)
+(see [Window Functions](#window-functions)) attached to it. The function will 
contain the computation to
+be applied to the contents of the window, while the `Trigger` specifies the 
conditions under which the window is
+considered ready for the function to be applied. A triggering policy might be 
something like "when the number of elements
+in the window is more than 4", or "when the watermark passes the end of the 
window". A trigger can also decide to
+purge a window's contents any time between its creation and removal. Purging 
in this case only refers to the elements
+in the window, and *not* the window metadata. This means that new data can 
still be added to that window.
+
+In the following we go into more detail for each of the components above. We 
start with the required parts in the above
+snippet (see [Keyed Windows](#keyed-windows), [Window 
Assigner](#window-assigners), and [Window Function](#window-functions)) 
+before moving to the optional ones.
+
+## Keyed Windows
+
+The first thing to specify is whether your stream should be keyed or not. This 
has to be done before defining the window.
+Using the `key_by(...)` will split your infinite stream into logical keyed 
streams. If `key_by(...)` is not called, your
+stream is not keyed.
+
+In the case of keyed streams, any attribute of your incoming events can be 
used as a key
+(more details [here]({{< ref "docs/dev/datastream/fault-tolerance/state" 
>}}#keyed-datastream)). Having a keyed stream will
+allow your windowed computation to be performed in parallel by multiple tasks, 
as each logical keyed stream can be processed
+independently from the rest. All elements referring to the same key will be 
sent to the same parallel task.
+
+## Window Assigners
+
+After specifying your stream is keyed, the next step is to define a *window 
assigner*.
+The window assigner defines how elements are assigned to windows. This is done 
by specifying the `WindowAssigner`
+of your choice in the `window(...)` (for *keyed* streams) call.
+
+A `WindowAssigner` is responsible for assigning each incoming element to one 
or more windows.
+You can implement a custom window assigner by extending the `WindowAssigner` 
class.
+
+Time-based windows have a *start timestamp* (inclusive) and an *end timestamp* 
(exclusive)
+that together describe the size of the window. In code, Flink uses 
`TimeWindow` when working with
+time-based windows which has methods for querying the start- and end-timestamp 
and also an
+additional method `max_timestamp()` that returns the largest allowed timestamp 
for a given windows.
+
+In the following, we show how to custom a *tumbling windows* assigner. For 
details of Tumbling Windows, you can
+refer to the [the relevant documentation]({{< ref 
"docs/dev/datastream/operators/windows" >}}#tumbling-windows).
+
+```python
+from typing import Tuple, Collection
+
+from pyflink.common.serializer import TypeSerializer
+from pyflink.datastream import WindowAssigner, Trigger
+from pyflink.datastream.window import TimeWindow, TimeWindowSerializer
+
+class TumblingEventWindowAssigner(WindowAssigner[Tuple, TimeWindow]):
+
+    def __init__(self, size: int, offset: int, is_event_time: bool):
+        self._size = size
+        self._offset = offset
+        self._is_event_time = is_event_time
+
+    def assign_windows(self,
+                       element: Tuple,
+                       timestamp: int,
+                       context: WindowAssigner.WindowAssignerContext) -> 
Collection[TimeWindow]:
+        start = TimeWindow.get_window_start_with_offset(timestamp, 
self._offset, self._size)
+        return [TimeWindow(start, start + self._size)]
+
+    def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
+        return EventTimeTrigger()
+
+    def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
+        return TimeWindowSerializer()
+
+    def is_event_time(self) -> bool:
+        return False
+```
+
+## Window Functions
+
+After defining the window assigner, we need to specify the computation that we 
want
+to perform on each of these windows. This is the responsibility of the *window 
function*, which is used to process the
+elements of each keyed window once the system determines that a window is 
ready for processing
+(see [triggers](#triggers) for how Flink determines when a window is ready).
+
+The window function can be `ProcessWindowFunction` or `WindowFunction`. They 
get an `Iterable` for all the elements contained in a
+window and additional meta information about the window to which the elements 
belong.
+
+In some places where a `ProcessWindowFunction` can be used you can also use a 
`WindowFunction`.
+This is an older version of ProcessWindowFunction that provides less 
contextual information and
+does not have some advances features, such as per-window keyed state. We will 
look at examples for each of these variants.
+
+### ProcessWindowFunction
+
+A ProcessWindowFunction gets an Iterable containing all the elements of the 
window, and a Context
+object with access to time and state information, which enables it to provide 
more flexibility than
+other window functions. This comes at the cost of performance and resource 
consumption, because
+elements cannot be incrementally aggregated but instead need to be buffered 
internally until the
+window is considered ready for processing.
+
+The signature of `ProcessWindowFunction` looks as follows:
+
+```python
+class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
+    """
+    Base interface for functions that are evaluated over keyed (grouped) 
windows using a context
+    for retrieving extra information.
+    """
+
+    class Context(ABC, Generic[W2]):
+        """
+        The context holding window metadata.
+        """
+
+        @abstractmethod
+        def window(self) -> W2:
+            """
+            :return: The window that is being evaluated.
+            """
+            pass
+
+        @abstractmethod
+        def current_processing_time(self) -> int:
+            """
+            :return: The current processing time.
+            """
+            pass
+
+        @abstractmethod
+        def current_watermark(self) -> int:
+            """
+            :return: The current event-time watermark.
+            """
+            pass
+
+        @abstractmethod
+        def window_state(self) -> KeyedStateStore:
+            """
+            State accessor for per-key and per-window state.
+
+            .. note::
+                If you use per-window state you have to ensure that you clean 
it up by implementing
+                :func:`~ProcessWindowFunction.clear`.
+
+            :return: The :class:`KeyedStateStore` used to access per-key and 
per-window states.
+            """
+            pass
+
+        @abstractmethod
+        def global_state(self) -> KeyedStateStore:
+            """
+            State accessor for per-key global state.
+            """
+            pass
+
+    @abstractmethod
+    def process(self,
+                key: KEY,
+                content: 'ProcessWindowFunction.Context',
+                elements: Iterable[IN]) -> Iterable[OUT]:
+        """
+        Evaluates the window and outputs none or several elements.
+
+        :param key: The key for which this window is evaluated.
+        :param content: The context in which the window is being evaluated.
+        :param elements: The elements in the window being evaluated.
+        :return: The iterable object which produces the elements to emit.
+        """
+        pass
+
+    @abstractmethod
+    def clear(self, context: 'ProcessWindowFunction.Context') -> None:
+        """
+        Deletes any state in the :class:`Context` when the Window expires (the 
watermark passes its
+        max_timestamp + allowed_lateness).
+
+        :param context: The context to which the window is being evaluated.
+        """
+        pass
+```
+
+The `key` parameter is the key that is extracted
+via the `KeySelector` that was specified for the `key_by()` invocation. In 
case of tuple-index
+keys or string-field references this key type is always `Tuple` and you have 
to manually cast
+it to a tuple of the correct size to extract the key fields.
+
+A `ProcessWindowFunction` can be defined and used like this:
+
+```python
+from typing import Tuple, Iterable
+
+from pyflink.common.typeinfo import Types
+from pyflink.datastream.window import TimeWindow
+
+class SumWindowProcessFunction(ProcessWindowFunction[Tuple, Tuple, str, 
TimeWindow]):
+
+    def process(self,
+                key: str,
+                content: ProcessWindowFunction.Context,
+                elements: Iterable[Tuple]) -> Iterable[tuple]:
+        result = 0
+        for i in elements:
+            result += i[0]
+        return [(key, result)]
+
+    def clear(self, context: ProcessWindowFunction.Context) -> None:
+        pass
+
+data_stream = env.from_collection([
+    (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
+    type_info=Types.TUPLE([Types.INT(), Types.STRING()]))  # type: DataStream
+data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+    .window(TumblingEventWindowAssigner()) \
+    .process(SumWindowProcessFunction(), Types.TUPLE([Types.STRING(), 
Types.INT()]))
+```
+### WindowFunction
+
+In some places where a `ProcessWindowFunction` can be used you can also use a 
`WindowFunction`. This
+is an older version of `ProcessWindowFunction` that provides less contextual 
information and does
+not have some advances features, such as per-window keyed state. This 
interface will be deprecated
+at some point.
+
+The signature of a `WindowFunction` looks as follows:
+
+```python
+class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
+    """
+    Base interface for functions that are evaluated over keyed (grouped) 
windows.
+    """
+
+    @abstractmethod
+    def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> 
Iterable[OUT]:
+        """
+        Evaluates the window and outputs none or several elements.
+
+        :param key: The key for which this window is evaluated.
+        :param window: The window that is being evaluated.
+        :param inputs: The elements in the window being evaluated.
+        """
+        pass
+```
+
+It can be used like this:
+
+```python
+from typing import Tuple, Iterable
+
+from pyflink.common.typeinfo import Types
+from pyflink.datastream.window import TimeWindow
+
+class SumWindowFunction(WindowFunction[Tuple, Tuple, str, TimeWindow]):
+
+    def apply(self, key: str, window: TimeWindow, inputs: Iterable[Tuple]):
+        result = 0
+        for i in inputs:
+            result += i[0]
+        return [(key, result)]
+
+data_stream = env.from_collection([
+    (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
+    type_info=Types.TUPLE([Types.INT(), Types.STRING()]))  # type: DataStream
+data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+    .window(TumblingEventWindowAssigner()) \
+    .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
+```
+
+## Triggers
+
+A Trigger determines when a window (as formed by the window assigner) is ready 
to be processed by the window function.
+Each WindowAssigner comes with a default Trigger. You can specify a custom 
trigger using trigger(...).
+
+The signature of `ProcessWindowFunction` looks as follows:
+
+```python
+class Trigger(ABC, Generic[T, W]):
+    """
+    A Trigger determines when a pane of a window should be evaluated to emit 
the results for that
+    part of the window.
+
+    A pane is the bucket of elements that have the same key (assigned by the 
KeySelector) and same
+    Window. An element can be in multiple panes if it was assigned to multiple 
windows by the
+    WindowAssigner. These panes all have their own instance of the Trigger.
+
+    Triggers must not maintain state internally since they can be re-created 
or reused for different
+    keys. All necessary state should be persisted using the state abstraction 
available on the
+    TriggerContext.
+
+    When used with a MergingWindowAssigner the Trigger must return true from 
:func:`can_merge` and
+    :func:`on_merge` most be properly implemented.
+    """
+
+    class TriggerContext(ABC):
+        """
+        A context object that is given to :class:`Trigger` methods to allow 
them to register timer
+        callbacks and deal with state.
+        """
+
+        @abstractmethod
+        def get_current_processing_time(self) -> int:
+            """
+            :return: The current processing time.
+            """
+            pass
+
+        @abstractmethod
+        def get_metric_group(self) -> MetricGroup:
+            """
+            Returns the metric group for this :class:`Trigger`. This is the 
same metric group that
+            would be returned from
+            
:func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user 
function.
+
+            :return: The metric group.
+            """
+            pass
+
+        @abstractmethod
+        def get_current_watermark(self) -> int:
+            """
+            :return: The current watermark time.
+            """
+            pass
+
+        @abstractmethod
+        def register_processing_time_timer(self, time: int) -> None:
+            """
+            Register a system time callback. When the current system time 
passes the specified time
+            :func:`~Trigger.on_processing_time` is called with the time 
specified here.
+
+            :param time: The time at which to invoke 
:func:`~Trigger.on_processing_time`.
+            """
+            pass
+
+        @abstractmethod
+        def register_event_time_timer(self, time: int) -> None:
+            """
+            Register an event-time callback. When the current watermark passes 
the specified time
+            :func:`~Trigger.on_event_time` is called with the time specified 
here.
+
+            :param time: The watermark at which to invoke 
:func:`~Trigger.on_event_time`.
+            """
+            pass
+
+        @abstractmethod
+        def delete_processing_time_timer(self, time: int) -> None:
+            """
+            Delete the processing time trigger for the given time.
+            """
+            pass
+
+        @abstractmethod
+        def delete_event_time_timer(self, time: int) -> None:
+            """
+            Delete the event-time trigger for the given time.
+            """
+            pass
+
+        @abstractmethod
+        def get_partitioned_state(self, state_descriptor: StateDescriptor) -> 
State:
+            """
+            Retrieves a :class:`State` object that can be used to interact 
with fault-tolerant state
+            that is scoped to the window and key of the current trigger 
invocation.
+
+            :param state_descriptor: The StateDescriptor that contains the 
name and type of the
+                                     state that is being accessed.
+            :return: The partitioned state object.
+            """
+            pass
+
+    class OnMergeContext(TriggerContext):
+        """
+        Extension of :class:`TriggerContext` that is given to 
:func:`~Trigger.on_merge`.
+        """
+
+        @abstractmethod
+        def merge_partitioned_state(self, state_descriptor: StateDescriptor) 
-> None:
+            pass
+
+    @abstractmethod
+    def on_element(self,
+                   element: T,
+                   timestamp: int,
+                   window: W,
+                   ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        """
+        Called for every element that gets added to a pane. The result of this 
will determine
+        whether the pane is evaluated to emit results.
+
+        :param element: The element that arrived.
+        :param timestamp: The timestamp of the element that arrived.
+        :param window: The window to which the element is being added.
+        :param ctx: A context object that can be used to register timer 
callbacks.
+        """
+        pass
+
+    @abstractmethod
+    def on_processing_time(self,
+                           time: int,
+                           window: W,
+                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        """
+        Called when a processing-time timer that was set using the trigger 
context fires.
+
+        :param time: The timestamp at which the timer fired.
+        :param window: The window for which the timer fired.
+        :param ctx: A context object that can be used to register timer 
callbacks.
+        """
+        pass
+
+    @abstractmethod
+    def on_event_time(self, time: int, window: W, ctx: 
'Trigger.TriggerContext') -> TriggerResult:
+        """
+        Called when an event-time timer that was set using the trigger context 
fires.
+
+        :param time: The timestamp at which the timer fired.
+        :param window: The window for which the timer fired.
+        :param ctx: A context object that can be used to register timer 
callbacks.
+        """
+        pass
+
+    def can_merge(self) -> bool:
+        """
+        .. note:: If this returns true you must properly implement 
:func:`~Trigger.on_merge`
+
+        :return: True if this trigger supports merging of trigger state and 
can therefore be used
+                 with a MergingWindowAssigner.
+        """
+        return False
+
+    @abstractmethod
+    def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None:
+        """
+        Called when several windows have been merged into one window by the 
:class:`WindowAssigner`.
+
+        :param window: The new window that results from the merge.
+        :param ctx: A context object that can be used to register timer 
callbacks and access state.
+        """
+        pass
+
+    @abstractmethod
+    def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None:
+        """
+        Clears any state that the trigger might still hold for the given 
window. This is called when
+        a window is purged. Timers set using 
:func:`~TriggerContext.register_event_time_timer` and
+        :func:`~TriggerContext.register_processing_time_timer` should be 
deleted here as well as
+        state acquired using :func:`~TriggerContext.get_partitioned_state`.
+        """
+        pass
+```
+
+Two things to notice about the above methods are:
+
+1) The first three(*on_element*, *on_processing_time* and *on_event_time*) 
decide how to act on their invocation event by returning a `TriggerResult`.
+The action can be one of the following:
+
+* `CONTINUE`: do nothing,
+* `FIRE`: trigger the computation,
+* `PURGE`: clear the elements in the window, and
+* `FIRE_AND_PURGE`: trigger the computation and clear the elements in the 
window afterwards.
+
+2) Any of these methods can be used to register processing- or event-time 
timers for future actions.
+
+### Fire and Purge
+
+Once a trigger determines that a window is ready for processing, it fires, 
*i.e.*, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the 
window operator
+to emit the result of the current window. Given a window with a 
`ProcessWindowFunction`, all elements are passed to the `ProcessWindowFunction`.
+
+When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` 
keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
+By default, the pre-implemented triggers simply `FIRE` without purging the 
window state.
+
+You can implement a custom EventTimeTrigger as follows:
+
+```python
+from typing import Tuple
+from pyflink.datastream.window import TimeWindow
+
+class EventTimeTrigger(Trigger[Tuple, TimeWindow]):
+
+    def on_element(self,
+                   element: Tuple,
+                   timestamp: int,
+                   window: TimeWindow,
+                   ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        return TriggerResult.CONTINUE
+
+    def on_processing_time(self,
+                           time: int,
+                           window: TimeWindow,
+                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        return TriggerResult.CONTINUE
+
+    def on_event_time(self,
+                      time: int,
+                      window: TimeWindow,
+                      ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        if time >= window.max_timestamp():
+            return TriggerResult.FIRE_AND_PURGE
+        else:
+            return TriggerResult.CONTINUE
+
+    def on_merge(self,
+                 window: TimeWindow,
+                 ctx: 'Trigger.OnMergeContext') -> None:
+        pass
+
+    def clear(self,
+              window: TimeWindow,
+              ctx: 'Trigger.TriggerContext') -> None:
+        pass
+```
+
+## Allowed Lateness
+
+When working with *event-time* windowing, it can happen that elements arrive 
late, *i.e.* the watermark that Flink uses to
+keep track of the progress of event-time is already past the end timestamp of 
a window to which an element belongs. See
+[event time]({{< ref "docs/dev/datastream/event-time/generating_watermarks" 
>}}) and especially [late elements]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" >}}#late-elements) for a 
more thorough
+discussion of how Flink deals with event time.
+
+By default, late elements are dropped when the watermark is past the end of 
the window. However,
+Flink allows to specify a maximum *allowed lateness* for window operators. 
Allowed lateness
+specifies by how much time elements can be late before they are dropped, and 
its default value is 0.
+Elements that arrive after the watermark has passed the end of the window but 
before it passes the end of
+the window plus the allowed lateness, are still added to the window. Depending 
on the trigger used,
+a late but not dropped element may cause the window to fire again.
+
+In order to make this work, Flink keeps the state of windows until their 
allowed lateness expires. Once this happens, Flink removes the window and 
deletes its state, as
+also described in the [Window Lifecycle](#window-lifecycle) section.
+
+By default, the allowed lateness is set to `0`. That is, elements that arrive 
behind the watermark will be dropped.
+
+You can specify an allowed lateness like this:
+
+```python
+data_stream.key_by(<key selector>) \
+    .window(<window assigner>) \
+    .allowed_lateness(<time>) \
+    .<windowed transformation>(<window function>)
+```
diff --git a/docs/content/docs/dev/python/datastream/operators/_index.md 
b/docs/content/docs/dev/python/datastream/operators/_index.md
new file mode 100644
index 0000000..574fa3e
--- /dev/null
+++ b/docs/content/docs/dev/python/datastream/operators/_index.md
@@ -0,0 +1,23 @@
+---
+title: Operators
+bookCollapseSection: true
+weight: 21
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/dev/python/datastream/operators.md 
b/docs/content/docs/dev/python/datastream/operators/overview.md
similarity index 99%
rename from docs/content/docs/dev/python/datastream/operators.md
rename to docs/content/docs/dev/python/datastream/operators/overview.md
index bf49d52..30cd642 100644
--- a/docs/content/docs/dev/python/datastream/operators.md
+++ b/docs/content/docs/dev/python/datastream/operators/overview.md
@@ -1,6 +1,6 @@
 ---
-title: "Operators"
-weight: 21
+title: "Overview"
+weight: 1
 type: docs
 aliases:
   - /dev/python/datastream-api-users-guide/operators.html
diff --git a/docs/content/docs/dev/python/datastream/operators/windows.md 
b/docs/content/docs/dev/python/datastream/operators/windows.md
new file mode 100644
index 0000000..a79d7c7
--- /dev/null
+++ b/docs/content/docs/dev/python/datastream/operators/windows.md
@@ -0,0 +1,585 @@
+---
+title: "Windows"
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Windows
+
+Windows are at the heart of processing infinite streams. Windows split the 
stream into "buckets" of finite size,
+over which we can apply computations. This document focuses on how windowing 
is performed in Flink and how the
+programmer can benefit to the maximum from its offered functionality.
+
+Currently, the widow operation is only supported in *keyed* streams
+
+**Keyed Windows**
+
+    stream
+           .key_by(...)
+           .window(...)              <-  required: "assigner"
+          [.trigger(...)]            <-  optional: "trigger" (else default 
trigger)
+          [.allowed_lateness(...)]   <-  optional: "lateness" (else zero)
+           .apply/process()          <-  required: "function"
+
+In the above, the commands in square brackets ([...]) are optional. This 
reveals that Flink allows you to customize your
+windowing logic in many different ways so that it best fits your needs.
+
+## Window Lifecycle
+
+In a nutshell, a window is **created** as soon as the first element that 
should belong to this window arrives, and the
+window is **completely removed** when the time (event or processing time) 
passes its end timestamp plus the user-specified
+`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink 
guarantees removal only for time-based
+windows and not for other types, *e.g.* global windows (see [Window 
Assigners](#window-assigners)). For example, with an
+event-time-based windowing strategy that creates non-overlapping (or tumbling) 
windows every 5 minutes and has an allowed
+lateness of 1 min, Flink will create a new window for the interval between 
`12:00` and `12:05` when the first element with
+a timestamp that falls into this interval arrives, and it will remove it when 
the watermark passes the `12:06`
+timestamp.
+
+In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and 
a function (`WindowFunction` or `ProcessWindowFunction`)
+(see [Window Functions](#window-functions)) attached to it. The function will 
contain the computation to
+be applied to the contents of the window, while the `Trigger` specifies the 
conditions under which the window is
+considered ready for the function to be applied. A triggering policy might be 
something like "when the number of elements
+in the window is more than 4", or "when the watermark passes the end of the 
window". A trigger can also decide to
+purge a window's contents any time between its creation and removal. Purging 
in this case only refers to the elements
+in the window, and *not* the window metadata. This means that new data can 
still be added to that window.
+
+In the following we go into more detail for each of the components above. We 
start with the required parts in the above
+snippet (see [Keyed Windows](#keyed-windows), [Window 
Assigner](#window-assigners), and [Window Function](#window-functions)) 
+before moving to the optional ones.
+
+## Keyed Windows
+
+The first thing to specify is whether your stream should be keyed or not. This 
has to be done before defining the window.
+Using the `key_by(...)` will split your infinite stream into logical keyed 
streams. If `key_by(...)` is not called, your
+stream is not keyed.
+
+In the case of keyed streams, any attribute of your incoming events can be 
used as a key
+(more details [here]({{< ref "docs/dev/datastream/fault-tolerance/state" 
>}}#keyed-datastream)). Having a keyed stream will
+allow your windowed computation to be performed in parallel by multiple tasks, 
as each logical keyed stream can be processed
+independently from the rest. All elements referring to the same key will be 
sent to the same parallel task.
+
+## Window Assigners
+
+After specifying your stream is keyed, the next step is to define a *window 
assigner*.
+The window assigner defines how elements are assigned to windows. This is done 
by specifying the `WindowAssigner`
+of your choice in the `window(...)` (for *keyed* streams) call.
+
+A `WindowAssigner` is responsible for assigning each incoming element to one 
or more windows.
+You can implement a custom window assigner by extending the `WindowAssigner` 
class.
+
+Time-based windows have a *start timestamp* (inclusive) and an *end timestamp* 
(exclusive)
+that together describe the size of the window. In code, Flink uses 
`TimeWindow` when working with
+time-based windows which has methods for querying the start- and end-timestamp 
and also an
+additional method `max_timestamp()` that returns the largest allowed timestamp 
for a given windows.
+
+In the following, we show how to custom a *tumbling windows* assigner. For 
details of Tumbling Windows, you can
+refer to the [the relevant documentation]({{< ref 
"docs/dev/datastream/operators/windows" >}}#tumbling-windows).
+
+```python
+from typing import Tuple, Collection
+
+from pyflink.common.serializer import TypeSerializer
+from pyflink.datastream import WindowAssigner, Trigger
+from pyflink.datastream.window import TimeWindow, TimeWindowSerializer
+
+class TumblingEventWindowAssigner(WindowAssigner[Tuple, TimeWindow]):
+
+    def __init__(self, size: int, offset: int, is_event_time: bool):
+        self._size = size
+        self._offset = offset
+        self._is_event_time = is_event_time
+
+    def assign_windows(self,
+                       element: Tuple,
+                       timestamp: int,
+                       context: WindowAssigner.WindowAssignerContext) -> 
Collection[TimeWindow]:
+        start = TimeWindow.get_window_start_with_offset(timestamp, 
self._offset, self._size)
+        return [TimeWindow(start, start + self._size)]
+
+    def get_default_trigger(self, env) -> Trigger[Tuple, TimeWindow]:
+        return EventTimeTrigger()
+
+    def get_window_serializer(self) -> TypeSerializer[TimeWindow]:
+        return TimeWindowSerializer()
+
+    def is_event_time(self) -> bool:
+        return False
+```
+
+## Window Functions
+
+After defining the window assigner, we need to specify the computation that we 
want
+to perform on each of these windows. This is the responsibility of the *window 
function*, which is used to process the
+elements of each keyed window once the system determines that a window is 
ready for processing
+(see [triggers](#triggers) for how Flink determines when a window is ready).
+
+The window function can be `ProcessWindowFunction` or `WindowFunction`. They 
get an `Iterable` for all the elements contained in a
+window and additional meta information about the window to which the elements 
belong.
+
+In some places where a `ProcessWindowFunction` can be used you can also use a 
`WindowFunction`.
+This is an older version of ProcessWindowFunction that provides less 
contextual information and
+does not have some advances features, such as per-window keyed state. We will 
look at examples for each of these variants.
+
+### ProcessWindowFunction
+
+A ProcessWindowFunction gets an Iterable containing all the elements of the 
window, and a Context
+object with access to time and state information, which enables it to provide 
more flexibility than
+other window functions. This comes at the cost of performance and resource 
consumption, because
+elements cannot be incrementally aggregated but instead need to be buffered 
internally until the
+window is considered ready for processing.
+
+The signature of `ProcessWindowFunction` looks as follows:
+
+```python
+class ProcessWindowFunction(Function, Generic[IN, OUT, KEY, W]):
+    """
+    Base interface for functions that are evaluated over keyed (grouped) 
windows using a context
+    for retrieving extra information.
+    """
+
+    class Context(ABC, Generic[W2]):
+        """
+        The context holding window metadata.
+        """
+
+        @abstractmethod
+        def window(self) -> W2:
+            """
+            :return: The window that is being evaluated.
+            """
+            pass
+
+        @abstractmethod
+        def current_processing_time(self) -> int:
+            """
+            :return: The current processing time.
+            """
+            pass
+
+        @abstractmethod
+        def current_watermark(self) -> int:
+            """
+            :return: The current event-time watermark.
+            """
+            pass
+
+        @abstractmethod
+        def window_state(self) -> KeyedStateStore:
+            """
+            State accessor for per-key and per-window state.
+
+            .. note::
+                If you use per-window state you have to ensure that you clean 
it up by implementing
+                :func:`~ProcessWindowFunction.clear`.
+
+            :return: The :class:`KeyedStateStore` used to access per-key and 
per-window states.
+            """
+            pass
+
+        @abstractmethod
+        def global_state(self) -> KeyedStateStore:
+            """
+            State accessor for per-key global state.
+            """
+            pass
+
+    @abstractmethod
+    def process(self,
+                key: KEY,
+                content: 'ProcessWindowFunction.Context',
+                elements: Iterable[IN]) -> Iterable[OUT]:
+        """
+        Evaluates the window and outputs none or several elements.
+
+        :param key: The key for which this window is evaluated.
+        :param content: The context in which the window is being evaluated.
+        :param elements: The elements in the window being evaluated.
+        :return: The iterable object which produces the elements to emit.
+        """
+        pass
+
+    @abstractmethod
+    def clear(self, context: 'ProcessWindowFunction.Context') -> None:
+        """
+        Deletes any state in the :class:`Context` when the Window expires (the 
watermark passes its
+        max_timestamp + allowed_lateness).
+
+        :param context: The context to which the window is being evaluated.
+        """
+        pass
+```
+
+The `key` parameter is the key that is extracted
+via the `KeySelector` that was specified for the `key_by()` invocation. In 
case of tuple-index
+keys or string-field references this key type is always `Tuple` and you have 
to manually cast
+it to a tuple of the correct size to extract the key fields.
+
+A `ProcessWindowFunction` can be defined and used like this:
+
+```python
+from typing import Tuple, Iterable
+
+from pyflink.common.typeinfo import Types
+from pyflink.datastream.window import TimeWindow
+
+class SumWindowProcessFunction(ProcessWindowFunction[Tuple, Tuple, str, 
TimeWindow]):
+
+    def process(self,
+                key: str,
+                content: ProcessWindowFunction.Context,
+                elements: Iterable[Tuple]) -> Iterable[tuple]:
+        result = 0
+        for i in elements:
+            result += i[0]
+        return [(key, result)]
+
+    def clear(self, context: ProcessWindowFunction.Context) -> None:
+        pass
+
+data_stream = env.from_collection([
+    (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
+    type_info=Types.TUPLE([Types.INT(), Types.STRING()]))  # type: DataStream
+data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+    .window(TumblingEventWindowAssigner()) \
+    .process(SumWindowProcessFunction(), Types.TUPLE([Types.STRING(), 
Types.INT()]))
+```
+### WindowFunction
+
+In some places where a `ProcessWindowFunction` can be used you can also use a 
`WindowFunction`. This
+is an older version of `ProcessWindowFunction` that provides less contextual 
information and does
+not have some advances features, such as per-window keyed state. This 
interface will be deprecated
+at some point.
+
+The signature of a `WindowFunction` looks as follows:
+
+```python
+class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
+    """
+    Base interface for functions that are evaluated over keyed (grouped) 
windows.
+    """
+
+    @abstractmethod
+    def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> 
Iterable[OUT]:
+        """
+        Evaluates the window and outputs none or several elements.
+
+        :param key: The key for which this window is evaluated.
+        :param window: The window that is being evaluated.
+        :param inputs: The elements in the window being evaluated.
+        """
+        pass
+```
+
+It can be used like this:
+
+```python
+from typing import Tuple, Iterable
+
+from pyflink.common.typeinfo import Types
+from pyflink.datastream.window import TimeWindow
+
+class SumWindowFunction(WindowFunction[Tuple, Tuple, str, TimeWindow]):
+
+    def apply(self, key: str, window: TimeWindow, inputs: Iterable[Tuple]):
+        result = 0
+        for i in inputs:
+            result += i[0]
+        return [(key, result)]
+
+data_stream = env.from_collection([
+    (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
+    type_info=Types.TUPLE([Types.INT(), Types.STRING()]))  # type: DataStream
+data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+    .window(TumblingEventWindowAssigner()) \
+    .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
+```
+
+## Triggers
+
+A Trigger determines when a window (as formed by the window assigner) is ready 
to be processed by the window function.
+Each WindowAssigner comes with a default Trigger. You can specify a custom 
trigger using trigger(...).
+
+The signature of `ProcessWindowFunction` looks as follows:
+
+```python
+class Trigger(ABC, Generic[T, W]):
+    """
+    A Trigger determines when a pane of a window should be evaluated to emit 
the results for that
+    part of the window.
+
+    A pane is the bucket of elements that have the same key (assigned by the 
KeySelector) and same
+    Window. An element can be in multiple panes if it was assigned to multiple 
windows by the
+    WindowAssigner. These panes all have their own instance of the Trigger.
+
+    Triggers must not maintain state internally since they can be re-created 
or reused for different
+    keys. All necessary state should be persisted using the state abstraction 
available on the
+    TriggerContext.
+
+    When used with a MergingWindowAssigner the Trigger must return true from 
:func:`can_merge` and
+    :func:`on_merge` most be properly implemented.
+    """
+
+    class TriggerContext(ABC):
+        """
+        A context object that is given to :class:`Trigger` methods to allow 
them to register timer
+        callbacks and deal with state.
+        """
+
+        @abstractmethod
+        def get_current_processing_time(self) -> int:
+            """
+            :return: The current processing time.
+            """
+            pass
+
+        @abstractmethod
+        def get_metric_group(self) -> MetricGroup:
+            """
+            Returns the metric group for this :class:`Trigger`. This is the 
same metric group that
+            would be returned from
+            
:func:`~pyflink.datasteam.functions.RuntimeContext.get_metric_group` in a user 
function.
+
+            :return: The metric group.
+            """
+            pass
+
+        @abstractmethod
+        def get_current_watermark(self) -> int:
+            """
+            :return: The current watermark time.
+            """
+            pass
+
+        @abstractmethod
+        def register_processing_time_timer(self, time: int) -> None:
+            """
+            Register a system time callback. When the current system time 
passes the specified time
+            :func:`~Trigger.on_processing_time` is called with the time 
specified here.
+
+            :param time: The time at which to invoke 
:func:`~Trigger.on_processing_time`.
+            """
+            pass
+
+        @abstractmethod
+        def register_event_time_timer(self, time: int) -> None:
+            """
+            Register an event-time callback. When the current watermark passes 
the specified time
+            :func:`~Trigger.on_event_time` is called with the time specified 
here.
+
+            :param time: The watermark at which to invoke 
:func:`~Trigger.on_event_time`.
+            """
+            pass
+
+        @abstractmethod
+        def delete_processing_time_timer(self, time: int) -> None:
+            """
+            Delete the processing time trigger for the given time.
+            """
+            pass
+
+        @abstractmethod
+        def delete_event_time_timer(self, time: int) -> None:
+            """
+            Delete the event-time trigger for the given time.
+            """
+            pass
+
+        @abstractmethod
+        def get_partitioned_state(self, state_descriptor: StateDescriptor) -> 
State:
+            """
+            Retrieves a :class:`State` object that can be used to interact 
with fault-tolerant state
+            that is scoped to the window and key of the current trigger 
invocation.
+
+            :param state_descriptor: The StateDescriptor that contains the 
name and type of the
+                                     state that is being accessed.
+            :return: The partitioned state object.
+            """
+            pass
+
+    class OnMergeContext(TriggerContext):
+        """
+        Extension of :class:`TriggerContext` that is given to 
:func:`~Trigger.on_merge`.
+        """
+
+        @abstractmethod
+        def merge_partitioned_state(self, state_descriptor: StateDescriptor) 
-> None:
+            pass
+
+    @abstractmethod
+    def on_element(self,
+                   element: T,
+                   timestamp: int,
+                   window: W,
+                   ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        """
+        Called for every element that gets added to a pane. The result of this 
will determine
+        whether the pane is evaluated to emit results.
+
+        :param element: The element that arrived.
+        :param timestamp: The timestamp of the element that arrived.
+        :param window: The window to which the element is being added.
+        :param ctx: A context object that can be used to register timer 
callbacks.
+        """
+        pass
+
+    @abstractmethod
+    def on_processing_time(self,
+                           time: int,
+                           window: W,
+                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        """
+        Called when a processing-time timer that was set using the trigger 
context fires.
+
+        :param time: The timestamp at which the timer fired.
+        :param window: The window for which the timer fired.
+        :param ctx: A context object that can be used to register timer 
callbacks.
+        """
+        pass
+
+    @abstractmethod
+    def on_event_time(self, time: int, window: W, ctx: 
'Trigger.TriggerContext') -> TriggerResult:
+        """
+        Called when an event-time timer that was set using the trigger context 
fires.
+
+        :param time: The timestamp at which the timer fired.
+        :param window: The window for which the timer fired.
+        :param ctx: A context object that can be used to register timer 
callbacks.
+        """
+        pass
+
+    def can_merge(self) -> bool:
+        """
+        .. note:: If this returns true you must properly implement 
:func:`~Trigger.on_merge`
+
+        :return: True if this trigger supports merging of trigger state and 
can therefore be used
+                 with a MergingWindowAssigner.
+        """
+        return False
+
+    @abstractmethod
+    def on_merge(self, window: W, ctx: 'Trigger.OnMergeContext') -> None:
+        """
+        Called when several windows have been merged into one window by the 
:class:`WindowAssigner`.
+
+        :param window: The new window that results from the merge.
+        :param ctx: A context object that can be used to register timer 
callbacks and access state.
+        """
+        pass
+
+    @abstractmethod
+    def clear(self, window: W, ctx: 'Trigger.TriggerContext') -> None:
+        """
+        Clears any state that the trigger might still hold for the given 
window. This is called when
+        a window is purged. Timers set using 
:func:`~TriggerContext.register_event_time_timer` and
+        :func:`~TriggerContext.register_processing_time_timer` should be 
deleted here as well as
+        state acquired using :func:`~TriggerContext.get_partitioned_state`.
+        """
+        pass
+```
+
+Two things to notice about the above methods are:
+
+1) The first three(*on_element*, *on_processing_time* and *on_event_time*) 
decide how to act on their invocation event by returning a `TriggerResult`.
+The action can be one of the following:
+
+* `CONTINUE`: do nothing,
+* `FIRE`: trigger the computation,
+* `PURGE`: clear the elements in the window, and
+* `FIRE_AND_PURGE`: trigger the computation and clear the elements in the 
window afterwards.
+
+2) Any of these methods can be used to register processing- or event-time 
timers for future actions.
+
+### Fire and Purge
+
+Once a trigger determines that a window is ready for processing, it fires, 
*i.e.*, it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the 
window operator
+to emit the result of the current window. Given a window with a 
`ProcessWindowFunction`, all elements are passed to the `ProcessWindowFunction`.
+
+When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` 
keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
+By default, the pre-implemented triggers simply `FIRE` without purging the 
window state.
+
+You can implement a custom EventTimeTrigger as follows:
+
+```python
+from typing import Tuple
+from pyflink.datastream.window import TimeWindow
+
+class EventTimeTrigger(Trigger[Tuple, TimeWindow]):
+
+    def on_element(self,
+                   element: Tuple,
+                   timestamp: int,
+                   window: TimeWindow,
+                   ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        return TriggerResult.CONTINUE
+
+    def on_processing_time(self,
+                           time: int,
+                           window: TimeWindow,
+                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        return TriggerResult.CONTINUE
+
+    def on_event_time(self,
+                      time: int,
+                      window: TimeWindow,
+                      ctx: 'Trigger.TriggerContext') -> TriggerResult:
+        if time >= window.max_timestamp():
+            return TriggerResult.FIRE_AND_PURGE
+        else:
+            return TriggerResult.CONTINUE
+
+    def on_merge(self,
+                 window: TimeWindow,
+                 ctx: 'Trigger.OnMergeContext') -> None:
+        pass
+
+    def clear(self,
+              window: TimeWindow,
+              ctx: 'Trigger.TriggerContext') -> None:
+        pass
+```
+
+## Allowed Lateness
+
+When working with *event-time* windowing, it can happen that elements arrive 
late, *i.e.* the watermark that Flink uses to
+keep track of the progress of event-time is already past the end timestamp of 
a window to which an element belongs. See
+[event time]({{< ref "docs/dev/datastream/event-time/generating_watermarks" 
>}}) and especially [late elements]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks" >}}#late-elements) for a 
more thorough
+discussion of how Flink deals with event time.
+
+By default, late elements are dropped when the watermark is past the end of 
the window. However,
+Flink allows to specify a maximum *allowed lateness* for window operators. 
Allowed lateness
+specifies by how much time elements can be late before they are dropped, and 
its default value is 0.
+Elements that arrive after the watermark has passed the end of the window but 
before it passes the end of
+the window plus the allowed lateness, are still added to the window. Depending 
on the trigger used,
+a late but not dropped element may cause the window to fire again.
+
+In order to make this work, Flink keeps the state of windows until their 
allowed lateness expires. Once this happens, Flink removes the window and 
deletes its state, as
+also described in the [Window Lifecycle](#window-lifecycle) section.
+
+By default, the allowed lateness is set to `0`. That is, elements that arrive 
behind the watermark will be dropped.
+
+You can specify an allowed lateness like this:
+
+```python
+data_stream.key_by(<key selector>) \
+    .window(<window assigner>) \
+    .allowed_lateness(<time>) \
+    .<windowed transformation>(<window function>)
+```

Reply via email to