Hi Amit,
answers inline.
On 4/1/20 12:23 AM, amit kumar wrote:
Thanks Ankur for your reply.
By default the allowed lateness for a global window is zero but we can
also set it to be non-zero which will be used in the downstream
transforms where group by or window into with trigger is happening ?
(using allowedTimeStampSkew for unbounded sources/ sources which have
timestamped elements).
Setting allowedLateness for global window has no semantic meaning,
because global window will be triggered (using default trigger) only at
the end of input. Allowed lateness plays no role in that for global window.
allowedTimestampSkew is used for something different, it is used when
you reassign timestamps to elements which already have timestamps (e.g.
assigned by source) and you want to move them into past. The skew says
how far in the past you can go.
In both scenarios which I described earlier for *source transforms* is
it possible that the pipeline will drop data if I do not
specify allowedTimeStampSkew/ allowedLateness at the source
transforms(given I have late arriving data)? Can I just set allowed
lateness in the transform where I do groupBy or windowInto rather than
source.
AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not
the source. The source emits _watermarks_, which marks progress in event
time, but the data is then handled in the stateful operator. Each
operator can have its own allowedLateness (although the model ensures
that the lateness is by default inherited from one operator to the
other). Sources should simply assign elements to global windows (with no
allowed lateness, as allowed lateness has no meaning for global windows
as mentioned above).
In case of TextIO.read which reads from a bounded source and I assign
Timestamps to all elements in the second transform, will it be useful
in this case as well to set allowedTimeStampSkew after assigning
timestamps? I am trying to understand how the elements will be
available after assigning timestamps (Given all files are present on
file system), will they be ordered by timestamp, can some elements be
read after watermark has progressed above an element's event time ?
When executing batch pipeline, there is actually no watermark. Event
time moves discretely from -inf (computation not finished yet) to +inf
(computation finished). In the case you describe, you should not even
need to set allowedTimestampSkew, because elements output from TextIO
should (probably) be assigned timestamp of
BoundedWindow.TIMESTAMP_MIN_VALUE (I'm not sure if the model guarantees
this, but it seems reasonable). You can then reassign timestamps to the
future as you wish. You don't have to worry about allowed lateness
either, because that only applies to streaming pipelines, where event
time moves more smoothly. By the definition of how event time progresses
in case of batch pipelines, there is no "late" (after watermark) data in
this case.
TextIO.Read.
|. Bounded source
|. Global Window
|. -infinity watermark
apply
WithTimeStamps (Based on a timestamp attribute in file)
|. timestamped elements (watermark starts from -infinity and
follows the timestamp from timestamp attribute)
|. Global Window
Regards,
Amit
On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <goe...@google.com
<mailto:goe...@google.com>> wrote:
Hi Amit,
As you don't have any GroupByKey or trigger in your pipeline, you
don't need to do allowed lateness.
For unbounded source, Global window will never fire a trigger or
emit GroupByKey.
In the code you linked, a trigger is used which uses allowedLateness.
Thanks,
Ankur
On Tue, Mar 31, 2020 at 11:20 AM amit kumar <akdata...@gmail.com
<mailto:akdata...@gmail.com>> wrote:
Thanks Jan!
I have a question based on this on Global Window and allowed
lateness, with default trigger for the following
scenarios:
Case 1-
TextIO.Read.
|. Bounded source
|. Global Window
|. -infinity watermark
apply
WithTimeStamps (Based on a timestamp attribute in file)
|. timestamped elements (watermark starts from -infinity
and follows the timestamp from timestamp attribute)
|. Global Window
|. (Will I never need to do allowedLateness in this case
with default trigger? Will there be any benefit since the
window is global and watermark will pass the end of window
when everything is processed ? )
Case 2 -
KinesisIO.read
| .Unbounded Source
|. Default Global Window
|. watermark based on arrival time
apply
WithTimeStamps (Based on a timestamp attribute from the stream)
|. timestamped elements ( watermark follows the
timestamp from timestamp attribute)
|. Global Window
|. Watermark based on event timestamp.
| Same question here will there be any benefit of using
allowedLateness since window is global ?
In the code example below allowedLateness is used for global
window ?
https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
Regards,
Amit
On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Amit,
the window function applied by default is
WindowingStrategy.globalDefault(), [1] - global window
with zero allowed
lateness.
Cheers,
Jan
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
On 3/31/20 10:22 AM, amit kumar wrote:
> Hi All,
>
> Is there a default WindowFn that gets applied to
elements of an
> unbounded source.
>
> For example, if I have a Kinesis input source ,for which
all elements
> are timestamped with ArrivalTime, what will be the
default windowing
> applied to the output of read transform ?
>
> Is this runner dependent ?
>
> Regards,
> Amit