Thank you all! your responses are very helpful. On Wed, Apr 1, 2020 at 11:37 AM Robert Bradshaw <rober...@google.com> wrote:
> > > On Wed, Apr 1, 2020 at 12:53 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Amit, >> >> answers inline. >> On 4/1/20 12:23 AM, amit kumar wrote: >> >> Thanks Ankur for your reply. >> >> By default the allowed lateness for a global window is zero but we can >> also set it to be non-zero which will be used in the downstream transforms >> where group by or window into with trigger is happening ? >> (using allowedTimeStampSkew for unbounded sources/ sources which have >> timestamped elements). >> >> Setting allowedLateness for global window has no semantic meaning, >> because global window will be triggered (using default trigger) only at the >> end of input. Allowed lateness plays no role in that for global window. >> >> allowedTimestampSkew is used for something different, it is used when you >> reassign timestamps to elements which already have timestamps (e.g. >> assigned by source) and you want to move them into past. The skew says how >> far in the past you can go. >> >> >> In both scenarios which I described earlier for *source transforms* is >> it possible that the pipeline will drop data if I do not >> specify allowedTimeStampSkew/ allowedLateness at the source >> transforms(given I have late arriving data)? Can I just set allowed >> lateness in the transform where I do groupBy or windowInto rather than >> source. >> >> AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not >> the source. The source emits _watermarks_, which marks progress in event >> time, but the data is then handled in the stateful operator. Each operator >> can have its own allowedLateness (although the model ensures that the >> lateness is by default inherited from one operator to the other). Sources >> should simply assign elements to global windows (with no allowed lateness, >> as allowed lateness has no meaning for global windows as mentioned above). >> >> >> In case of TextIO.read which reads from a bounded source and I assign >> Timestamps to all elements in the second transform, will it be useful in >> this case as well to set allowedTimeStampSkew after assigning timestamps? I >> am trying to understand how the elements will be available after assigning >> timestamps (Given all files are present on file system), will they be >> ordered by timestamp, can some elements be read after watermark has >> progressed above an element's event time ? >> >> When executing batch pipeline, there is actually no watermark. Event time >> moves discretely from -inf (computation not finished yet) to +inf >> (computation finished). In the case you describe, you should not even need >> to set allowedTimestampSkew, because elements output from TextIO should >> (probably) be assigned timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE (I'm >> not sure if the model guarantees this, but it seems reasonable). You can >> then reassign timestamps to the future as you wish. You don't have to worry >> about allowed lateness either, because that only applies to streaming >> pipelines, where event time moves more smoothly. By the definition of how >> event time progresses in case of batch pipelines, there is no "late" (after >> watermark) data in this case. >> > > Clarification: sources should assign elements to their upstream window > (similar to DoFns), generally with the appropriate timestamp (unless they > are timestamp aware). The upstream of a bounded source is typically > Impulse, which is in the global window with MIN_TIMESTAMP, but could be > different. This better unifies the case of reading the elements from a set > of filenames published to pubsub, for example. > >> >> >> TextIO.Read. >> |. Bounded source >> |. Global Window >> |. -infinity watermark >> apply >> WithTimeStamps (Based on a timestamp attribute in file) >> |. timestamped elements (watermark starts from -infinity and follows >> the timestamp from timestamp attribute) >> |. Global Window >> >> >> Regards, >> Amit >> >> On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <goe...@google.com> wrote: >> >>> Hi Amit, >>> >>> As you don't have any GroupByKey or trigger in your pipeline, you don't >>> need to do allowed lateness. >>> For unbounded source, Global window will never fire a trigger or emit >>> GroupByKey. >>> In the code you linked, a trigger is used which uses allowedLateness. >>> >>> Thanks, >>> Ankur >>> >>> On Tue, Mar 31, 2020 at 11:20 AM amit kumar <akdata...@gmail.com> wrote: >>> >>>> Thanks Jan! >>>> I have a question based on this on Global Window and allowed lateness, >>>> with default trigger for the following >>>> scenarios: >>>> >>>> Case 1- >>>> TextIO.Read. >>>> |. Bounded source >>>> |. Global Window >>>> |. -infinity watermark >>>> apply >>>> WithTimeStamps (Based on a timestamp attribute in file) >>>> |. timestamped elements (watermark starts from -infinity and >>>> follows the timestamp from timestamp attribute) >>>> |. Global Window >>>> |. (Will I never need to do allowedLateness in this case with >>>> default trigger? Will there be any benefit since the window is global and >>>> watermark will pass the end of window when everything is processed ? ) >>>> >>>> >>>> Case 2 - >>>> KinesisIO.read >>>> | .Unbounded Source >>>> |. Default Global Window >>>> |. watermark based on arrival time >>>> apply >>>> WithTimeStamps (Based on a timestamp attribute from the stream) >>>> |. timestamped elements ( watermark follows the timestamp from >>>> timestamp attribute) >>>> |. Global Window >>>> |. Watermark based on event timestamp. >>>> | Same question here will there be any benefit of using >>>> allowedLateness since window is global ? >>>> >>>> In the code example below allowedLateness is used for global window ? >>>> >>>> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307 >>>> >>>> Regards, >>>> Amit >>>> >>>> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi Amit, >>>>> >>>>> the window function applied by default is >>>>> WindowingStrategy.globalDefault(), [1] - global window with zero >>>>> allowed >>>>> lateness. >>>>> >>>>> Cheers, >>>>> >>>>> Jan >>>>> >>>>> [1] >>>>> >>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105 >>>>> >>>>> On 3/31/20 10:22 AM, amit kumar wrote: >>>>> > Hi All, >>>>> > >>>>> > Is there a default WindowFn that gets applied to elements of an >>>>> > unbounded source. >>>>> > >>>>> > For example, if I have a Kinesis input source ,for which all >>>>> elements >>>>> > are timestamped with ArrivalTime, what will be the default windowing >>>>> > applied to the output of read transform ? >>>>> > >>>>> > Is this runner dependent ? >>>>> > >>>>> > Regards, >>>>> > Amit >>>>> >>>>