Re-hi,
I debuged a bit the test for the Event rowtime
I tested the testBoundNonPartitionedEventTimeWindowWithRange from SQLITCase
class
Although I would expect that once a watermark is triggered: 1) the on timer
will be called to process the events that arrived so far and 2) the future
events that arrive will be dropped. However, it seems that almost the entire
input can arrive in the processElement function before the onTimer is
triggered.
Moreover, if you modify the input to add an un-ordered event (see dataset below
where I added after watermark 14000 ...an event with watermark 1000...as far as
I would expect this should be dropped. However, in different runs it can happen
that it will be not dropped. Basically it can happen that the onTimer was never
triggered and this event arrives and it is registered). Is this correct? Am I
missing something?
@Test
def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = {
val data = Seq(
Left((1500L, (1L, 15, "Hello"))),
Left((1600L, (1L, 16, "Hello"))),
Left((1000L, (1L, 1, "Hello"))),
Left((2000L, (2L, 2, "Hello"))),
Right(1000L),
Left((2000L, (2L, 2, "Hello"))),
Left((2000L, (2L, 3, "Hello"))),
Left((3000L, (3L, 3, "Hello"))),
Right(2000L),
Left((4000L, (4L, 4, "Hello"))),
Right(3000L),
Left((5000L, (5L, 5, "Hello"))),
Right(5000L),
Left((6000L, (6L, 6, "Hello"))),
Left((6500L, (6L, 65, "Hello"))),
Right(7000L),
Left((9000L, (6L, 9, "Hello"))),
Left((9500L, (6L, 18, "Hello"))),
Left((9000L, (6L, 9, "Hello"))),
Right(10000L),
Left((10000L, (7L, 7, "Hello World"))),
Left((11000L, (7L, 17, "Hello World"))),
Left((11000L, (7L, 77, "Hello World"))),
Right(12000L),
Left((14000L, (7L, 18, "Hello World"))),
Right(14000L),
Left((15000L, (8L, 8, "Hello World"))),
Left((1000L, (8L, 8, "Too late - Hello World"))), ///event is out of
ordered and showed be droppped
Right(17000L),
Left((20000L, (20L, 20, "Hello World"))),
Right(19000L))
-----Original Message-----
From: Fabian Hueske [mailto:[email protected]]
Sent: Thursday, April 27, 2017 3:17 PM
To: [email protected]
Subject: Re: question about rowtime processfunction - are watermarks needed?
Hi Radu,
event-time processing requires watermarks. Operators use watermarks to compute
the current event-time.
The ProcessFunctions for over range windows use the TimerServices to group
elements by time.
In case of event-time, the timers are triggered by the event-time of the
operator which is derived from the received watermarks.
In case of processing-time, the timers are triggered based on the wallclock
time of the operator.
So by using event-tim timers, we implicitly rely on the watermarks because the
timers are triggered based on the received watermarks.
Best, Fabian
2017-04-27 10:51 GMT+02:00 Radu Tudoran <[email protected]>:
> Hi,
>
> I am looking at the implementation of RowTimeBoundedRangeOver (in the
> context of Stream SQL). I see that the logic is that the progress
> happens based on the timestamps of the rowevent - i.e., when an even
> arrives we register to be processed based on it's timestamp (ctx.timerService.
> registerEventTimeTimer(triggeringTs))
>
> In the onTimer we remove (retract) data that has expired. However, we
> do not consider watermarks nor some allowed latency for the events or
> anything like this, which makes me ask:
> Don't we need to work with watermarks when we deal with even time? And
> keep the events within the allowed delayed/next watermark? Am I
> missing something? Or maybe we do not consider at this point
> allowedLateness for this version?
>
> Thanks
>
> Best regards,
>
>