Thank you Lukasz! I accepted your answer.
As a follow up: if I would add repeated triggers that fire every 10 seconds
like this:
.apply(Window.<KV<String,
String>>into(Sessions.withGapDuration(Duration.standardSeconds(600)))
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))
).orFinally(AfterWatermark.pastEndOfWindow()))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(60)))
How could I now test the output of a given pane? I've only found examples
where the PAssert uses inOnTimePane or inFinalPane.
Thanks in advance!
Bart
Op vr 24 aug. 2018 om 19:42 schreef Lukasz Cwik <[email protected]>:
> Replied to the SO but cross posted the answer I put:
>
> 1) I believe you have a simple unit issue. The window gap duration of 600
> is being specified in seconds Duration.standardSeconds[1] yet new
> Instant(long)[2] uses milliseconds which means that the 600 second gap is
> larger then the time interval of 700 millis causing the sessions to be
> merged.
>
> 2) Sessions still use interval windows internally. You will need to
> compute what the output window would be after all sessions are merged based
> upon your trigger strategy. By default, a session window uses the
> IntervalWindow(timestamp, gap duration)[3], and merges all overlapping
> windows[4] to create a larger window. For example, if you had the windows
> (start time, end time), [10, 14], [12, 18], [4, 14] for the same session
> key, they would all be merged producing a single [4, 18] window.
>
>
> [1]:
> http://joda-time.sourceforge.net/apidocs/org/joda/time/Duration.html#standardSeconds(long)
> [2]:
> http://joda-time.sourceforge.net/apidocs/org/joda/time/Instant.html#Instant(long)
> [3]:
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L59
> [4]:
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L64
>
> On Fri, Aug 24, 2018 at 12:57 AM Bart Aelterman <[email protected]>
> wrote:
>
>>
>> Hi there,
>>
>> I am trying to implement a unit test for a pipeline that implements
>> session windows. However I am not getting the results I expected. I posted
>> the question on Stack Overflow:
>> https://stackoverflow.com/questions/51994579/how-to-write-unit-tests-for-session-windows-in-a-beam-pipeline
>>
>> Sorry for cross posting.
>>
>> Thanks a lot,
>>
>> Bart Aelterman
>>
>>
>>
--
Met vriendelijke groeten,
Bart Aelterman
Freelance data scientist
http://www.bart-aelterman.com