Hi Amit,

answers inline.

On 4/1/20 12:23 AM, amit kumar wrote:
Thanks Ankur for your reply.

By default the allowed lateness for a global window is zero but we can also set  it to be non-zero which will be used in the downstream transforms where group by or window into with trigger is happening ?  (using allowedTimeStampSkew for unbounded sources/ sources which have timestamped elements).

Setting allowedLateness for global window has no semantic meaning, because global window will be triggered (using default trigger) only at the end of input. Allowed lateness plays no role in that for global window.

allowedTimestampSkew is used for something different, it is used when you reassign timestamps to elements which already have timestamps (e.g. assigned by source) and you want to move them into past. The skew says how far in the past you can go.


In both scenarios which I described earlier for *source transforms* is it possible that the pipeline will drop data if I do not specify allowedTimeStampSkew/ allowedLateness at the source transforms(given I have late arriving data)? Can I just set allowed lateness in the transform where I do groupBy or windowInto rather than source.
AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not the source. The source emits _watermarks_, which marks progress in event time, but the data is then handled in the stateful operator. Each operator can have its own allowedLateness (although the model ensures that the lateness is by default inherited from one operator to the other). Sources should simply assign elements to global windows (with no allowed lateness, as allowed lateness has no meaning for global windows as mentioned above).

In case of TextIO.read which reads from a bounded source and I assign Timestamps to all elements in the second transform, will it be useful in this case as well to set allowedTimeStampSkew after assigning timestamps? I am trying to understand how the elements will be available after assigning timestamps (Given all files are present on file system), will they be ordered by timestamp, can some elements be read after watermark has progressed above an element's event time  ?
When executing batch pipeline, there is actually no watermark. Event time moves discretely from -inf (computation not finished yet) to +inf (computation finished). In the case you describe, you should not even need to set allowedTimestampSkew, because elements output from TextIO should (probably) be assigned timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE (I'm not sure if the model guarantees this, but it seems reasonable). You can then reassign timestamps to the future as you wish. You don't have to worry about allowed lateness either, because that only applies to streaming pipelines, where event time moves more smoothly. By the definition of how event time progresses in case of batch pipelines, there is no "late" (after watermark) data in this case.


TextIO.Read.
     |. Bounded source
     |. Global Window
     |.  -infinity watermark
apply
WithTimeStamps (Based on a timestamp attribute in file)
   |.   timestamped elements (watermark starts from -infinity and follows the timestamp from timestamp attribute)
   |.   Global Window


Regards,
Amit

On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <goe...@google.com <mailto:goe...@google.com>> wrote:

    Hi Amit,

    As you don't have any GroupByKey or trigger in your pipeline, you
    don't need to do allowed lateness.
    For unbounded source, Global window will never fire a trigger or
    emit GroupByKey.
    In the code you linked, a trigger is used which uses allowedLateness.

    Thanks,
    Ankur

    On Tue, Mar 31, 2020 at 11:20 AM amit kumar <akdata...@gmail.com
    <mailto:akdata...@gmail.com>> wrote:

        Thanks Jan!
        I have a question based on this on Global Window and allowed
        lateness, with default trigger for the following
         scenarios:

        Case 1-
        TextIO.Read.
             |. Bounded source
             |. Global Window
             |.  -infinity watermark
        apply
        WithTimeStamps (Based on a timestamp attribute in file)
           |.   timestamped elements (watermark starts from -infinity
        and follows the timestamp from timestamp attribute)
           |.   Global Window
           |. (Will I never need to do allowedLateness in this case
        with default trigger? Will there be any benefit since the
        window is global and watermark will pass the end of window
        when everything is processed ? )


        Case 2 -
        KinesisIO.read
            | .Unbounded Source
            |. Default Global Window
            |. watermark based on arrival time
         apply
        WithTimeStamps (Based on a timestamp attribute from the stream)
           |.   timestamped elements  ( watermark follows the
        timestamp from timestamp attribute)
           |.   Global Window
           |. Watermark based on event timestamp.
           | Same question here will there be any benefit of using
        allowedLateness since window is global ?

        In the code example below allowedLateness is used for global
        window ?
        
https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307

        Regards,
        Amit

        On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            Hi Amit,

            the window function applied by default is
            WindowingStrategy.globalDefault(), [1] - global window
            with zero allowed
            lateness.

            Cheers,

              Jan

            [1]
            
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105

            On 3/31/20 10:22 AM, amit kumar wrote:
            > Hi All,
            >
            > Is there a default WindowFn that gets applied to
            elements of an
            > unbounded source.
            >
            > For example, if I have a Kinesis input source ,for which
            all elements
            > are timestamped with ArrivalTime, what will be the
            default windowing
            > applied to the output of read transform ?
            >
            > Is this runner dependent ?
            >
            > Regards,
            > Amit

Reply via email to