It seems like we are missing "inEarlyPanes" within PAssert. With the current API I believe you'll be limited to making sure the number of windowed outputs correspond with how your triggering logic lines up which isn't as strong as saying these things are early, these things are on time and these things are late. e.g. PAssert.that(values).inWindow(earlyWindow1).containsInAnyOrder(earlyWindow1Element1, earlyWindow1Element2, ...); PAssert.that(values).inWindow(earlyWindow2).containsInAnyOrder(earlyWindow2Element2, earlyWindow2Element2, ...); PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1, ontimeWIndowElement2, ...);
isnt as good as: PAssert.that(values).inEarlyPane(earlyWindow1).containsInAnyOrder(earlyWindow1Element1, earlyWindow1Element2, ...); PAssert.that(values).inEarlyPane(earlyWindow2).containsInAnyOrder(earlyWindow2Element2, earlyWindow2Element2, ...); PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1, ontimeWIndowElement2, ...); ... Feel free to add a contribution which adds inEarlyPane within PAssert. Our contribution guide is a good place to start https://beam.apache.org/contribute/ to make this happen. On Fri, Aug 24, 2018 at 12:34 PM Bart Aelterman <[email protected]> wrote: > Thank you Lukasz! I accepted your answer. > > As a follow up: if I would add repeated triggers that fire every 10 > seconds like this: > > .apply(Window.<KV<String, > String>>into(Sessions.withGapDuration(Duration.standardSeconds(600))) > .triggering(Repeatedly.forever(AfterProcessingTime > .pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(10)) > ).orFinally(AfterWatermark.pastEndOfWindow())) > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardSeconds(60))) > > How could I now test the output of a given pane? I've only found examples > where the PAssert uses inOnTimePane or inFinalPane. > > Thanks in advance! > > Bart > > Op vr 24 aug. 2018 om 19:42 schreef Lukasz Cwik <[email protected]>: > >> Replied to the SO but cross posted the answer I put: >> >> 1) I believe you have a simple unit issue. The window gap duration of 600 >> is being specified in seconds Duration.standardSeconds[1] yet new >> Instant(long)[2] uses milliseconds which means that the 600 second gap is >> larger then the time interval of 700 millis causing the sessions to be >> merged. >> >> 2) Sessions still use interval windows internally. You will need to >> compute what the output window would be after all sessions are merged based >> upon your trigger strategy. By default, a session window uses the >> IntervalWindow(timestamp, gap duration)[3], and merges all overlapping >> windows[4] to create a larger window. For example, if you had the windows >> (start time, end time), [10, 14], [12, 18], [4, 14] for the same session >> key, they would all be merged producing a single [4, 18] window. >> >> >> [1]: >> http://joda-time.sourceforge.net/apidocs/org/joda/time/Duration.html#standardSeconds(long) >> [2]: >> http://joda-time.sourceforge.net/apidocs/org/joda/time/Instant.html#Instant(long) >> [3]: >> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L59 >> [4]: >> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L64 >> >> On Fri, Aug 24, 2018 at 12:57 AM Bart Aelterman <[email protected]> >> wrote: >> >>> >>> Hi there, >>> >>> I am trying to implement a unit test for a pipeline that implements >>> session windows. However I am not getting the results I expected. I posted >>> the question on Stack Overflow: >>> https://stackoverflow.com/questions/51994579/how-to-write-unit-tests-for-session-windows-in-a-beam-pipeline >>> >>> Sorry for cross posting. >>> >>> Thanks a lot, >>> >>> Bart Aelterman >>> >>> >>> > > -- > Met vriendelijke groeten, > > Bart Aelterman > Freelance data scientist > http://www.bart-aelterman.com > >
