Hi,
yes, there is a possible non-determinism, that is related to the
timestamp combiner. Timestamp combiners combine only elements, that are
not 'late' ([1]), meaning that their timestamp is not preceding output
watermark of the GBK. Looking at the pipeline code I suppose that could
be the cause. You can make the pipeline deterministic by using
TimestampCombiner.END_OF_WINDOW (default). If you *need* to use the
TimestampCombiner.EARLIEST, you can probably do that by tweaking the
looping timer stateful dofn and fix timestamps there (using timer output
timestamp).
Jan
[1] https://issues.apache.org/jira/browse/BEAM-2262
On 1/12/21 5:26 PM, Raman Gupta wrote:
Your reply made me realize I removed the condition from my local copy
of the looping timer that brings the timer forward if it encounters an
earlier element later in the stream:
|if (currentTimerValue == null ||
currentTimerValue**>**nextTimerTimeBasedOnCurrentElement.getMillis()) {|
Restoring that condition fixes that issue.
However, the reason I removed that condition in the first place was
because it was making a unit test non-deterministic -- sometimes the
element timestamps into the looping timer didn't seem to match the
element timestamps according to the EARLIEST timestamp combiner
defined, causing the timer to execute an additional time.
The pipeline:
input
// withAllowedTimestampSkew is deprecated, but as of now, there is no
replacement // https://issues.apache.org/jira/browse/BEAM-644 .apply("XTimestamps", WithTimestamps
.of<X>{ it.enteredAt.asJoda()}
.withAllowedTimestampSkew(Duration.INFINITE.asJoda())
)
.apply("FixedTickWindows",
Window.into<X>(FixedWindows.of(5.minutes.asJoda()))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
.withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness(3.days.asJoda(),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
)
.apply("KeyByUser", WithKeys.of{ it.userId })
.apply("GroupByUser", GroupByKey.create())
.apply("GlobalWindowsLoopingStatefulTimer", Window.into<KV<String,
Iterable<X>>>(GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
)
.apply("LoopingStatefulTimer", ParDo.of(LoopingStatefulTimer(5.minutes,
(options.timerTimeoutDays ?:30).days)))
The looping timer receives an @Timestamp value in the process function of:
1970-01-01T07:34:59.999Z
but the earliest timestamp of the (single) element in the elements
iterable is:
1970-01-01T07:30:00.000Z
I would have thought given my timestamp combiners on my windows that
the timestamp should have been 07:30:00.000Z. Is there something wrong
in my pipeline that is causing this non-deterministic behavior?
Thanks,
Raman
On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi Raman,
can you share the details of the pipeline? How exactly are you
using the
looping timer? Timer as described in the linked blog post should be
deterministic even when the order of the input elements is undefined.
Does you logic depend on element ordering?
Jan
On 1/12/21 3:18 PM, Raman Gupta wrote:
> Hello, I am building and testing a pipeline with the direct runner.
> The pipeline includes a looping timer -
> https://beam.apache.org/blog/looping-timers/.
>
> For now, I am using JdbcIO to obtain my input data, though when put
> into production the pipeline will use PubSubIO.
>
> I am finding that the looping timer begins producing outputs at a
> random event time, which makes some sense given the
randomization of
> inputs in the direct runner. However, this makes the results of
> executing my pipeline with the direct runner completely
non-deterministic.
>
> So:
>
> 1) Is there a way to turn off this non-deterministic behavior, but
> just for the GlobalWindow / LoopingTimer?
>
> 2) Perhaps alternatively, is there a way to "initialize" the
looping
> timer when the pipeline starts, rather than when it first sees an
> element? Perhaps a side input?
>
> 3) Am I right in assuming that when I move this pipeline to
pub/sub io
> and operate it in streaming mode, this issue will go away?
>
> Thanks!
> Raman
>