Thank you all!
your responses are very helpful.

On Wed, Apr 1, 2020 at 11:37 AM Robert Bradshaw <rober...@google.com> wrote:

>
>
> On Wed, Apr 1, 2020 at 12:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Amit,
>>
>> answers inline.
>> On 4/1/20 12:23 AM, amit kumar wrote:
>>
>> Thanks Ankur for your reply.
>>
>> By default the allowed lateness for a global window is zero but we can
>> also set  it to be non-zero which will be used in the downstream transforms
>> where group by or window into with trigger is happening ?
>>  (using allowedTimeStampSkew for unbounded sources/ sources which have
>> timestamped elements).
>>
>> Setting allowedLateness for global window has no semantic meaning,
>> because global window will be triggered (using default trigger) only at the
>> end of input. Allowed lateness plays no role in that for global window.
>>
>> allowedTimestampSkew is used for something different, it is used when you
>> reassign timestamps to elements which already have timestamps (e.g.
>> assigned by source) and you want to move them into past. The skew says how
>> far in the past you can go.
>>
>>
>> In both scenarios which I described earlier for *source transforms* is
>> it possible that the pipeline will drop data if I do not
>> specify allowedTimeStampSkew/ allowedLateness at the source
>> transforms(given I have late arriving data)? Can I just set allowed
>> lateness in the transform where I do groupBy or windowInto rather than
>> source.
>>
>> AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not
>> the source. The source emits _watermarks_, which marks progress in event
>> time, but the data is then handled in the stateful operator. Each operator
>> can have its own allowedLateness (although the model ensures that the
>> lateness is by default inherited from one operator to the other). Sources
>> should simply assign elements to global windows (with no allowed lateness,
>> as allowed lateness has no meaning for global windows as mentioned above).
>>
>>
>> In case of TextIO.read which reads from a bounded source and I assign
>> Timestamps to all elements in the second transform, will it be useful in
>> this case as well to set allowedTimeStampSkew after assigning timestamps? I
>> am trying to understand how the elements will be available after assigning
>> timestamps (Given all files are present on file system), will they be
>> ordered by timestamp, can some elements be read after watermark has
>> progressed above an element's event time  ?
>>
>> When executing batch pipeline, there is actually no watermark. Event time
>> moves discretely from -inf (computation not finished yet) to +inf
>> (computation finished). In the case you describe, you should not even need
>> to set allowedTimestampSkew, because elements output from TextIO should
>> (probably) be assigned timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE (I'm
>> not sure if the model guarantees this, but it seems reasonable). You can
>> then reassign timestamps to the future as you wish. You don't have to worry
>> about allowed lateness either, because that only applies to streaming
>> pipelines, where event time moves more smoothly. By the definition of how
>> event time progresses in case of batch pipelines, there is no "late" (after
>> watermark) data in this case.
>>
>
> Clarification: sources should assign elements to their upstream window
> (similar to DoFns), generally with the appropriate timestamp (unless they
> are timestamp aware). The upstream of a bounded source is typically
> Impulse, which is in the global window with MIN_TIMESTAMP, but could be
> different. This better unifies the case of reading the elements from a set
> of filenames published to pubsub, for example.
>
>>
>>
>> TextIO.Read.
>>      |. Bounded source
>>      |. Global Window
>>      |.  -infinity watermark
>> apply
>> WithTimeStamps (Based on a timestamp attribute in file)
>>    |.   timestamped elements (watermark starts from -infinity and follows
>> the timestamp from timestamp attribute)
>>    |.   Global Window
>>
>>
>> Regards,
>> Amit
>>
>> On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <goe...@google.com> wrote:
>>
>>> Hi Amit,
>>>
>>> As you don't have any GroupByKey or trigger in your pipeline, you don't
>>> need to do allowed lateness.
>>> For unbounded source, Global window will never fire a trigger or emit
>>> GroupByKey.
>>> In the code you linked, a trigger is used which uses allowedLateness.
>>>
>>> Thanks,
>>> Ankur
>>>
>>> On Tue, Mar 31, 2020 at 11:20 AM amit kumar <akdata...@gmail.com> wrote:
>>>
>>>> Thanks Jan!
>>>> I have a question based on this on Global Window and allowed lateness,
>>>> with default trigger for the following
>>>>  scenarios:
>>>>
>>>> Case 1-
>>>> TextIO.Read.
>>>>      |. Bounded source
>>>>      |. Global Window
>>>>      |.  -infinity watermark
>>>> apply
>>>> WithTimeStamps (Based on a timestamp attribute in file)
>>>>    |.   timestamped elements (watermark starts from -infinity and
>>>> follows the timestamp from timestamp attribute)
>>>>    |.   Global Window
>>>>    |. (Will I never need to do allowedLateness in this case with
>>>> default trigger? Will there be any benefit since the window is global and
>>>> watermark will pass the end of window when everything is processed ?  )
>>>>
>>>>
>>>> Case 2 -
>>>> KinesisIO.read
>>>>     | .Unbounded Source
>>>>     |. Default Global Window
>>>>     |. watermark based on arrival time
>>>>  apply
>>>> WithTimeStamps (Based on a timestamp attribute from the stream)
>>>>    |.   timestamped elements  ( watermark follows the timestamp from
>>>> timestamp attribute)
>>>>    |.   Global Window
>>>>    |. Watermark based on event timestamp.
>>>>    | Same question here will there be any benefit of using
>>>> allowedLateness since window is global ?
>>>>
>>>> In the code example below allowedLateness is used for global window ?
>>>>
>>>> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>>>>
>>>> Regards,
>>>> Amit
>>>>
>>>> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Amit,
>>>>>
>>>>> the window function applied by default is
>>>>> WindowingStrategy.globalDefault(), [1] - global window with zero
>>>>> allowed
>>>>> lateness.
>>>>>
>>>>> Cheers,
>>>>>
>>>>>   Jan
>>>>>
>>>>> [1]
>>>>>
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>>>>>
>>>>> On 3/31/20 10:22 AM, amit kumar wrote:
>>>>> > Hi All,
>>>>> >
>>>>> > Is there a default WindowFn that gets applied to elements of an
>>>>> > unbounded source.
>>>>> >
>>>>> > For example, if I have a Kinesis input source ,for which all
>>>>> elements
>>>>> > are timestamped with ArrivalTime, what will be the default windowing
>>>>> > applied to the output of read transform ?
>>>>> >
>>>>> > Is this runner dependent ?
>>>>> >
>>>>> > Regards,
>>>>> > Amit
>>>>>
>>>>

Reply via email to