On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user < user@beam.apache.org> wrote:
> Hi, > > > > We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a > bunch of events from Kafka and should execute an SQL command on a 1-hour > window. Some of the events arrive late. > > I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s > fields as the timestamp. > > For the aggregation, it’s important that the window triggers *exactly > once* with all the events, with allowed lateness of 3 minutes. I defined > the window as: > > > > final PCollection<Row> windowSelectFields = selectFields > > .apply("Windowing", Window > > > .<Row>into(FixedWindows.of(Duration.standardHours(1))) > > .triggering(Repeatedly.forever( > > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3))) > > ) > > .withAllowedLateness(Duration.standardMinutes(3)) > > .accumulatingFiredPanes() > > ); > > > > When tested on a smaller window with a small number of events, I see that > the first 3 out of 10 events are being discarded. From the log, it looks > like the trigger is executed *1 second ahead of time*. I suspect that as > a result, its shouldFire() method returns false, since 3 minutes have not > passed yet. > Processing-time triggers are based on the local clock on a worker, and clocks can skew between workers (they can even skew between different processes on the same worker). > > > 2024-02-21 16:27:08,079 DEBUG > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator > [] - Setting timer: 1:1708533008079 at 1708533008079 with output time > 1708533008079. (that is *4:30:08.079 PM*) > > > > And later on: > > > > 2024-02-21 *16:30:07,944* DEBUG > org.apache.beam.sdk.util.WindowTracing [] - > ReduceFnRunner: Received timer key:Row: > > call_direction:-1729318488 > > ; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z); > data:TimerData{timerId=1:1708533008079, timerFamilyId=, > namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)), > timestamp=2024-02-*21T16:30:08.079Z*, > outputTimestamp=2024-02-21T16:30:08.079Z, domain=PROCESSING_TIME, > deleted=false} with inputWatermark:2024-02-21T16:18:04.000Z; > outputWatermark:2024-02-21T16:18:04.000Z > > > > Is my understanding correct? > > Did I define the window and timestamps correctly? > > Any help would be appreciated. > > > > Thanks, > > Ifat > > >