Re: Some events are discarded from a FixedWindow
I’m using a single task manager with 3 task slots, can there be a skew in this case? And how am I expected to handle this situation? Is there a best practice to guarentee exactly once execution of the events in the window without events being dropped? (BTW, just to correct my example, the pastFirstElementInPane() should be with a delay of 1 hour + 3 minutes) Thanks, Ifat From: Reuven Lax Date: Wednesday, 21 February 2024 at 22:43 To: "user@beam.apache.org" Cc: "Ifat Afek (Nokia)" Subject: Re: Some events are discarded from a FixedWindow CAUTION: This is an external email. Please be very careful when clicking links or opening attachments. See the URL nok.it/ext for additional information. On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user mailto:user@beam.apache.org>> wrote: Hi, We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a bunch of events from Kafka and should execute an SQL command on a 1-hour window. Some of the events arrive late. I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s fields as the timestamp. For the aggregation, it’s important that the window triggers exactly once with all the events, with allowed lateness of 3 minutes. I defined the window as: final PCollection windowSelectFields = selectFields .apply("Windowing", Window .into(FixedWindows.of(Duration.standardHours(1))) .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3))) ) .withAllowedLateness(Duration.standardMinutes(3)) .accumulatingFiredPanes() ); When tested on a smaller window with a small number of events, I see that the first 3 out of 10 events are being discarded. From the log, it looks like the trigger is executed 1 second ahead of time. I suspect that as a result, its shouldFire() method returns false, since 3 minutes have not passed yet. Processing-time triggers are based on the local clock on a worker, and clocks can skew between workers (they can even skew between different processes on the same worker). 2024-02-21 16:27:08,079 DEBUG org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator [] - Setting timer: 1:1708533008079 at 1708533008079 with output time 1708533008079. (that is 4:30:08.079 PM) And later on: 2024-02-21 16:30:07,944 DEBUG org.apache.beam.sdk.util.WindowTracing [] - ReduceFnRunner: Received timer key:Row: call_direction:-1729318488 ; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z); data:TimerData{timerId=1:1708533008079, timerFamilyId=, namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)), timestamp=2024-02-21T16:30:08.079Z, outputTimestamp=2024-02-21T16:30:08.079Z, domain=PROCESSING_TIME, deleted=false} with inputWatermark:2024-02-21T16:18:04.000Z; outputWatermark:2024-02-21T16:18:04.000Z Is my understanding correct? Did I define the window and timestamps correctly? Any help would be appreciated. Thanks, Ifat
Re: Some events are discarded from a FixedWindow
On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user < user@beam.apache.org> wrote: > Hi, > > > > We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a > bunch of events from Kafka and should execute an SQL command on a 1-hour > window. Some of the events arrive late. > > I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s > fields as the timestamp. > > For the aggregation, it’s important that the window triggers *exactly > once* with all the events, with allowed lateness of 3 minutes. I defined > the window as: > > > > final PCollection windowSelectFields = selectFields > > .apply("Windowing", Window > > > .into(FixedWindows.of(Duration.standardHours(1))) > > .triggering(Repeatedly.forever( > > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3))) > > ) > > .withAllowedLateness(Duration.standardMinutes(3)) > > .accumulatingFiredPanes() > > ); > > > > When tested on a smaller window with a small number of events, I see that > the first 3 out of 10 events are being discarded. From the log, it looks > like the trigger is executed *1 second ahead of time*. I suspect that as > a result, its shouldFire() method returns false, since 3 minutes have not > passed yet. > Processing-time triggers are based on the local clock on a worker, and clocks can skew between workers (they can even skew between different processes on the same worker). > > > 2024-02-21 16:27:08,079 DEBUG > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator > [] - Setting timer: 1:1708533008079 at 1708533008079 with output time > 1708533008079. (that is *4:30:08.079 PM*) > > > > And later on: > > > > 2024-02-21 *16:30:07,944* DEBUG > org.apache.beam.sdk.util.WindowTracing [] - > ReduceFnRunner: Received timer key:Row: > > call_direction:-1729318488 > > ; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z); > data:TimerData{timerId=1:1708533008079, timerFamilyId=, > namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)), > timestamp=2024-02-*21T16:30:08.079Z*, > outputTimestamp=2024-02-21T16:30:08.079Z, domain=PROCESSING_TIME, > deleted=false} with inputWatermark:2024-02-21T16:18:04.000Z; > outputWatermark:2024-02-21T16:18:04.000Z > > > > Is my understanding correct? > > Did I define the window and timestamps correctly? > > Any help would be appreciated. > > > > Thanks, > > Ifat > > >
Some events are discarded from a FixedWindow
Hi, We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a bunch of events from Kafka and should execute an SQL command on a 1-hour window. Some of the events arrive late. I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s fields as the timestamp. For the aggregation, it’s important that the window triggers exactly once with all the events, with allowed lateness of 3 minutes. I defined the window as: final PCollection windowSelectFields = selectFields .apply("Windowing", Window .into(FixedWindows.of(Duration.standardHours(1))) .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3))) ) .withAllowedLateness(Duration.standardMinutes(3)) .accumulatingFiredPanes() ); When tested on a smaller window with a small number of events, I see that the first 3 out of 10 events are being discarded. From the log, it looks like the trigger is executed 1 second ahead of time. I suspect that as a result, its shouldFire() method returns false, since 3 minutes have not passed yet. 2024-02-21 16:27:08,079 DEBUG org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator [] - Setting timer: 1:1708533008079 at 1708533008079 with output time 1708533008079. (that is 4:30:08.079 PM) And later on: 2024-02-21 16:30:07,944 DEBUG org.apache.beam.sdk.util.WindowTracing [] - ReduceFnRunner: Received timer key:Row: call_direction:-1729318488 ; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z); data:TimerData{timerId=1:1708533008079, timerFamilyId=, namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)), timestamp=2024-02-21T16:30:08.079Z, outputTimestamp=2024-02-21T16:30:08.079Z, domain=PROCESSING_TIME, deleted=false} with inputWatermark:2024-02-21T16:18:04.000Z; outputWatermark:2024-02-21T16:18:04.000Z Is my understanding correct? Did I define the window and timestamps correctly? Any help would be appreciated. Thanks, Ifat
Re: Request to join slack channel
Hello, Can someone add me in slack channel as well? Thanks, Utkarsh On Wed, Feb 21, 2024 at 11:10 AM George Dekermenjian wrote: > Me too please - slack channel. > > > On Wed, Feb 21, 2024 at 19:43 Geddy Schellevis > wrote: > >> Hey Valentyn, >> >> can you add me to slack channel as well? >> Thanks >> >> >> Op wo 21 feb 2024 om 19:21 schreef Valentyn Tymofieiev via user < >> user@beam.apache.org> >> >>> Hi Daniel, I submitted an invitation for your email address as well. >>> >>> On Tue, Feb 20, 2024 at 3:56 PM Daniel Chen wrote: >>> Hey Valentyn, can you add me to slack channel as well? On Tue, Feb 20, 2024 at 3:23 PM Valentyn Tymofieiev via user < user@beam.apache.org> wrote: > Hi Lydian, > > According to https://infra.apache.org/slack.html, invitations by link > have been disabled. I submitted an invitation for your email address. > > Thanks, > Valentyn > > On Tue, Feb 20, 2024 at 9:33 AM Lydian Lee > wrote: > >> Hi, >> >> Can I get the invitation to join slack channel ? The ASF slack seems >> required invitation to be able to join. Thanks >> >
Re: Request to join slack channel
Me too please - slack channel. On Wed, Feb 21, 2024 at 19:43 Geddy Schellevis wrote: > Hey Valentyn, > > can you add me to slack channel as well? > Thanks > > > Op wo 21 feb 2024 om 19:21 schreef Valentyn Tymofieiev via user < > user@beam.apache.org> > >> Hi Daniel, I submitted an invitation for your email address as well. >> >> On Tue, Feb 20, 2024 at 3:56 PM Daniel Chen wrote: >> >>> Hey Valentyn, can you add me to slack channel as well? >>> >>> On Tue, Feb 20, 2024 at 3:23 PM Valentyn Tymofieiev via user < >>> user@beam.apache.org> wrote: >>> Hi Lydian, According to https://infra.apache.org/slack.html, invitations by link have been disabled. I submitted an invitation for your email address. Thanks, Valentyn On Tue, Feb 20, 2024 at 9:33 AM Lydian Lee wrote: > Hi, > > Can I get the invitation to join slack channel ? The ASF slack seems > required invitation to be able to join. Thanks >
Re: Request to join slack channel
Hey Valentyn, can you add me to slack channel as well? Thanks Op wo 21 feb 2024 om 19:21 schreef Valentyn Tymofieiev via user < user@beam.apache.org> > Hi Daniel, I submitted an invitation for your email address as well. > > On Tue, Feb 20, 2024 at 3:56 PM Daniel Chen wrote: > >> Hey Valentyn, can you add me to slack channel as well? >> >> On Tue, Feb 20, 2024 at 3:23 PM Valentyn Tymofieiev via user < >> user@beam.apache.org> wrote: >> >>> Hi Lydian, >>> >>> According to https://infra.apache.org/slack.html, invitations by link >>> have been disabled. I submitted an invitation for your email address. >>> >>> Thanks, >>> Valentyn >>> >>> On Tue, Feb 20, 2024 at 9:33 AM Lydian Lee >>> wrote: >>> Hi, Can I get the invitation to join slack channel ? The ASF slack seems required invitation to be able to join. Thanks >>>
Re: Request to join slack channel
Hi Daniel, I submitted an invitation for your email address as well. On Tue, Feb 20, 2024 at 3:56 PM Daniel Chen wrote: > Hey Valentyn, can you add me to slack channel as well? > > On Tue, Feb 20, 2024 at 3:23 PM Valentyn Tymofieiev via user < > user@beam.apache.org> wrote: > >> Hi Lydian, >> >> According to https://infra.apache.org/slack.html, invitations by link >> have been disabled. I submitted an invitation for your email address. >> >> Thanks, >> Valentyn >> >> On Tue, Feb 20, 2024 at 9:33 AM Lydian Lee >> wrote: >> >>> Hi, >>> >>> Can I get the invitation to join slack channel ? The ASF slack seems >>> required invitation to be able to join. Thanks >>> >>