Are you making use of TestStream for your Unit test?
On Wed, 13 Jan 2021 at 00:27, Raman Gupta <[email protected]> wrote:
> Your reply made me realize I removed the condition from my local copy of
> the looping timer that brings the timer forward if it encounters an earlier
> element later in the stream:
>
> if (currentTimerValue == null || currentTimerValue >
> nextTimerTimeBasedOnCurrentElement.getMillis()) {
>
>
> Restoring that condition fixes that issue.
>
> However, the reason I removed that condition in the first place was
> because it was making a unit test non-deterministic -- sometimes the
> element timestamps into the looping timer didn't seem to match the element
> timestamps according to the EARLIEST timestamp combiner defined, causing
> the timer to execute an additional time.
>
> The pipeline:
>
> input
> // withAllowedTimestampSkew is deprecated, but as of now, there is no
> replacement
> // https://issues.apache.org/jira/browse/BEAM-644
> .apply("XTimestamps", WithTimestamps
> .of<X> { it.enteredAt.asJoda() }
> .withAllowedTimestampSkew(Duration.INFINITE.asJoda())
> )
> .apply("FixedTickWindows",
> Window.into<X>(FixedWindows.of(5.minutes.asJoda()))
> .triggering(
> AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
> .withLateFirings(AfterPane.elementCountAtLeast(1))
> )
> .withAllowedLateness(3.days.asJoda(),
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
> .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
> .discardingFiredPanes()
> .withTimestampCombiner(TimestampCombiner.EARLIEST)
> )
> .apply("KeyByUser", WithKeys.of { it.userId })
> .apply("GroupByUser", GroupByKey.create())
> .apply("GlobalWindowsLoopingStatefulTimer",
> Window.into<KV<String, Iterable<X>>>(GlobalWindows())
>
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
> .discardingFiredPanes()
> .withTimestampCombiner(TimestampCombiner.EARLIEST)
> )
> .apply("LoopingStatefulTimer",
> ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?:
> 30).days)))
>
>
> The looping timer receives an @Timestamp value in the process function of:
>
> 1970-01-01T07:34:59.999Z
>
> but the earliest timestamp of the (single) element in the elements
> iterable is:
>
> 1970-01-01T07:30:00.000Z
>
> I would have thought given my timestamp combiners on my windows that the
> timestamp should have been 07:30:00.000Z. Is there something wrong in my
> pipeline that is causing this non-deterministic behavior?
>
> Thanks,
> Raman
>
> On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <[email protected]> wrote:
>
>> Hi Raman,
>>
>> can you share the details of the pipeline? How exactly are you using the
>> looping timer? Timer as described in the linked blog post should be
>> deterministic even when the order of the input elements is undefined.
>> Does you logic depend on element ordering?
>>
>> Jan
>>
>> On 1/12/21 3:18 PM, Raman Gupta wrote:
>> > Hello, I am building and testing a pipeline with the direct runner.
>> > The pipeline includes a looping timer -
>> > https://beam.apache.org/blog/looping-timers/.
>> >
>> > For now, I am using JdbcIO to obtain my input data, though when put
>> > into production the pipeline will use PubSubIO.
>> >
>> > I am finding that the looping timer begins producing outputs at a
>> > random event time, which makes some sense given the randomization of
>> > inputs in the direct runner. However, this makes the results of
>> > executing my pipeline with the direct runner completely
>> non-deterministic.
>> >
>> > So:
>> >
>> > 1) Is there a way to turn off this non-deterministic behavior, but
>> > just for the GlobalWindow / LoopingTimer?
>> >
>> > 2) Perhaps alternatively, is there a way to "initialize" the looping
>> > timer when the pipeline starts, rather than when it first sees an
>> > element? Perhaps a side input?
>> >
>> > 3) Am I right in assuming that when I move this pipeline to pub/sub io
>> > and operate it in streaming mode, this issue will go away?
>> >
>> > Thanks!
>> > Raman
>> >
>>
>