On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user <
user@beam.apache.org> wrote:

> Hi,
>
>
>
> We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a
> bunch of events from Kafka and should execute an SQL command on a 1-hour
> window. Some of the events arrive late.
>
> I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s
> fields as the timestamp.
>
> For the aggregation, it’s important that the window triggers *exactly
> once* with all the events, with allowed lateness of 3 minutes. I defined
> the window as:
>
>
>
>         final PCollection<Row> windowSelectFields = selectFields
>
>                 .apply("Windowing", Window
>
>
> .<Row>into(FixedWindows.of(Duration.standardHours(1)))
>
>                         .triggering(Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3)))
>
>                         )
>
>                         .withAllowedLateness(Duration.standardMinutes(3))
>
>                         .accumulatingFiredPanes()
>
>                 );
>
>
>
> When tested on a smaller window with a small number of events, I see that
> the first 3 out of 10 events are being discarded. From the log, it looks
> like the trigger is executed *1 second ahead of time*. I suspect that as
> a result, its shouldFire() method returns false, since 3 minutes have not
> passed yet.
>

Processing-time triggers are based on the local clock on a worker, and
clocks can skew between workers (they can even skew between different
processes on the same worker).


>
>
> 2024-02-21 16:27:08,079 DEBUG
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
> [] - Setting timer: 1:1708533008079 at 1708533008079 with output time
> 1708533008079.                      (that is *4:30:08.079 PM*)
>
>
>
> And later on:
>
>
>
> 2024-02-21 *16:30:07,944* DEBUG
> org.apache.beam.sdk.util.WindowTracing                       [] -
> ReduceFnRunner: Received timer key:Row:
>
> call_direction:-1729318488
>
> ; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z);
> data:TimerData{timerId=1:1708533008079, timerFamilyId=,
> namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)),
> timestamp=2024-02-*21T16:30:08.079Z*,
> outputTimestamp=2024-02-21T16:30:08.079Z, domain=PROCESSING_TIME,
> deleted=false} with inputWatermark:2024-02-21T16:18:04.000Z;
> outputWatermark:2024-02-21T16:18:04.000Z
>
>
>
> Is my understanding correct?
>
> Did I define the window and timestamps correctly?
>
> Any help would be appreciated.
>
>
>
> Thanks,
>
> Ifat
>
>
>

Reply via email to