I think there still could be problems in some corner cases. The problem is, that elements considered 'late' in timestamp combiner have different definition than what is marked as late in PaneInfo. So you can have a corner case, when PaneInfo would on ON_TIME, but the timestamp would still be shifted to end of window. This would probably not be too often, but it can happen. If it is fine for your use case, then this could work.

Jan

On 1/13/21 3:59 PM, Raman Gupta wrote:
Hmm, I think I've found a simple solution... adding this to the beginning of my looping timer @ProcessElement function:

// late elements don't need to affect our looping timer, // pass them through without modification // this is kind of a work-around for https://issues.apache.org/jira/browse/BEAM-2262 // but I think makes sense in general for looping timers when // there is no need to trigger timers after the window is done if (paneInfo.timing == PaneInfo.Timing.LATE) {
   receiver.output(element)
   return }
At least all my unit tests are passing... is there any problem with this approach?

Thanks,
Raman


On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta <[email protected] <mailto:[email protected]>> wrote:

    (Replying to Reza) Yes, I am using TestStream for my unit test.
    Other replies below.

    On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi,

        yes, there is a possible non-determinism, that is related to
        the timestamp combiner. Timestamp combiners combine only
        elements, that are not 'late' ([1]), meaning that their
        timestamp is not preceding output watermark of the GBK.
        Looking at the pipeline code I suppose that could be the cause.


    Yes, the test stream in this test case does indeed send the
    element in question "late". Here is the setup:

    val base = Instant.EPOCH + 6.hours
    val xStream: TestStream<X> = TestStream.create(coder)
      .addElements(x["1"]) // this just initializes the looping timer
      // advance watermark past end of window that would normally
    process x2
      .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
      .addElements(x["2"]) // now we see the element
      .advanceWatermarkToInfinity()

    Here late element x["2"] has a timestamp of
    1970-01-01T07:30:00.000Z and the watermark at the time x["2"] is
    added is 1970-01-01T09:00:01.000Z.

    So I get your point that the timestamp combiner is not used for
    late elements, but if late elements are singly emitted as in this
    pipeline, why do any timestamp modification at all? I would expect
    them to arrive with their original timestamp, not one changed
    from 1970-01-01T07:30:00.000Z to 1970-01-01T07:34:59.999Z (this is
    the part that seems non-deterministic). What is the logic / reason
    behind the pipeline setting this element's timestamp
    to 1970-01-01T07:34:59.999Z?

        You can make the pipeline deterministic by using
        TimestampCombiner.END_OF_WINDOW (default).


    It's definitely not ideal for this use case, but I'll consider it.

        If you *need* to use the TimestampCombiner.EARLIEST, you can
        probably do that by tweaking the looping timer stateful dofn
        and fix timestamps there (using timer output timestamp).


    I had already tried that but the pipeline throws an error that the
    timestamp emitted cannot be earlier than the current element
    timestamp.

    Thanks,
    Raman

          Jan

        [1] https://issues.apache.org/jira/browse/BEAM-2262

        On 1/12/21 5:26 PM, Raman Gupta wrote:
        Your reply made me realize I removed the condition from my
        local copy of the looping timer that brings the timer forward
        if it encounters an earlier element later in the stream:

        |if (currentTimerValue == null ||
        currentTimerValue**>**nextTimerTimeBasedOnCurrentElement.getMillis())
        {|

        Restoring that condition fixes that issue.

        However, the reason I removed that condition in the first
        place was because it was making a unit test non-deterministic
        -- sometimes the element timestamps into the looping timer
        didn't seem to match the element timestamps according to the
        EARLIEST timestamp combiner defined, causing the timer to
        execute an additional time.

        The pipeline:

        input
           // withAllowedTimestampSkew is deprecated, but as of now,
        there is no replacement //
        https://issues.apache.org/jira/browse/BEAM-644 .apply("XTimestamps", 
WithTimestamps
             .of<X>{ it.enteredAt.asJoda()} 
.withAllowedTimestampSkew(Duration.INFINITE.asJoda())
           )
           .apply("FixedTickWindows", 
Window.into<X>(FixedWindows.of(5.minutes.asJoda()))
               .triggering(
                 AfterWatermark.pastEndOfWindow()
                   
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
                   .withLateFirings(AfterPane.elementCountAtLeast(1))
               )
               .withAllowedLateness(3.days.asJoda(), 
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
               .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
               .discardingFiredPanes()
               .withTimestampCombiner(TimestampCombiner.EARLIEST)
           )
           .apply("KeyByUser", WithKeys.of{ it.userId })
           .apply("GroupByUser", GroupByKey.create())
           .apply("GlobalWindowsLoopingStatefulTimer", Window.into<KV<String, 
Iterable<X>>>(GlobalWindows())
               
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
               .discardingFiredPanes()
               .withTimestampCombiner(TimestampCombiner.EARLIEST)
           )
           .apply("LoopingStatefulTimer", 
ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?:30).days)))

        The looping timer receives an @Timestamp value in the process
        function of:

        1970-01-01T07:34:59.999Z

        but the earliest timestamp of the (single) element in the
        elements iterable is:

        1970-01-01T07:30:00.000Z

        I would have thought given my timestamp combiners on my
        windows that the timestamp should have been 07:30:00.000Z. Is
        there something wrong in my pipeline that is causing this
        non-deterministic behavior?

        Thanks,
        Raman

        On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            Hi Raman,

            can you share the details of the pipeline? How exactly
            are you using the
            looping timer? Timer as described in the linked blog post
            should be
            deterministic even when the order of the input elements
            is undefined.
            Does you logic depend on element ordering?

              Jan

            On 1/12/21 3:18 PM, Raman Gupta wrote:
            > Hello, I am building and testing a pipeline with the
            direct runner.
            > The pipeline includes a looping timer -
            > https://beam.apache.org/blog/looping-timers/.
            >
            > For now, I am using JdbcIO to obtain my input data,
            though when put
            > into production the pipeline will use PubSubIO.
            >
            > I am finding that the looping timer begins producing
            outputs at a
            > random event time, which makes some sense given the
            randomization of
            > inputs in the direct runner. However, this makes the
            results of
            > executing my pipeline with the direct runner completely
            non-deterministic.
            >
            > So:
            >
            > 1) Is there a way to turn off this non-deterministic
            behavior, but
            > just for the GlobalWindow / LoopingTimer?
            >
            > 2) Perhaps alternatively, is there a way to
            "initialize" the looping
            > timer when the pipeline starts, rather than when it
            first sees an
            > element? Perhaps a side input?
            >
            > 3) Am I right in assuming that when I move this
            pipeline to pub/sub io
            > and operate it in streaming mode, this issue will go away?
            >
            > Thanks!
            > Raman
            >

Reply via email to