Thanks. My point is more that with repeated triggers, there is no such thing as one early pane. With a repeated trigger based on processing time (as I did in my example, every 10 seconds) there are potentially a lot of early panes. So I assume there is currently no good way to assert the output of those different panes?
Op vr 24 aug. 2018 om 23:05 schreef Lukasz Cwik <[email protected]>: > Actually, it seems like someone has already beat you to adding the > contribution within https://github.com/apache/beam/pull/5811, it is yet > to be reviewed though. > > On Fri, Aug 24, 2018 at 1:59 PM Lukasz Cwik <[email protected]> wrote: > >> It seems like we are missing "inEarlyPanes" within PAssert. >> >> With the current API I believe you'll be limited to making sure the >> number of windowed outputs correspond with how your triggering logic lines >> up which isn't as strong as saying these things are early, these things are >> on time and these things are late. e.g. >> PAssert.that(values).inWindow(earlyWindow1).containsInAnyOrder(earlyWindow1Element1, >> earlyWindow1Element2, ...); >> PAssert.that(values).inWindow(earlyWindow2).containsInAnyOrder(earlyWindow2Element2, >> earlyWindow2Element2, ...); >> PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1, >> ontimeWIndowElement2, ...); >> >> isnt as good as: >> >> PAssert.that(values).inEarlyPane(earlyWindow1).containsInAnyOrder(earlyWindow1Element1, >> earlyWindow1Element2, ...); >> PAssert.that(values).inEarlyPane(earlyWindow2).containsInAnyOrder(earlyWindow2Element2, >> earlyWindow2Element2, ...); >> PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1, >> ontimeWIndowElement2, ...); >> ... >> >> Feel free to add a contribution which adds inEarlyPane within PAssert. >> Our contribution guide is a good place to start >> https://beam.apache.org/contribute/ to make this happen. >> >> >> >> >> On Fri, Aug 24, 2018 at 12:34 PM Bart Aelterman <[email protected]> >> wrote: >> >>> Thank you Lukasz! I accepted your answer. >>> >>> As a follow up: if I would add repeated triggers that fire every 10 >>> seconds like this: >>> >>> .apply(Window.<KV<String, >>> String>>into(Sessions.withGapDuration(Duration.standardSeconds(600))) >>> .triggering(Repeatedly.forever(AfterProcessingTime >>> .pastFirstElementInPane() >>> .plusDelayOf(Duration.standardSeconds(10)) >>> ).orFinally(AfterWatermark.pastEndOfWindow())) >>> .accumulatingFiredPanes() >>> .withAllowedLateness(Duration.standardSeconds(60))) >>> >>> How could I now test the output of a given pane? I've only found >>> examples where the PAssert uses inOnTimePane or inFinalPane. >>> >>> Thanks in advance! >>> >>> Bart >>> >>> Op vr 24 aug. 2018 om 19:42 schreef Lukasz Cwik <[email protected]>: >>> >>>> Replied to the SO but cross posted the answer I put: >>>> >>>> 1) I believe you have a simple unit issue. The window gap duration of >>>> 600 is being specified in seconds Duration.standardSeconds[1] yet new >>>> Instant(long)[2] uses milliseconds which means that the 600 second gap is >>>> larger then the time interval of 700 millis causing the sessions to be >>>> merged. >>>> >>>> 2) Sessions still use interval windows internally. You will need to >>>> compute what the output window would be after all sessions are merged based >>>> upon your trigger strategy. By default, a session window uses the >>>> IntervalWindow(timestamp, gap duration)[3], and merges all overlapping >>>> windows[4] to create a larger window. For example, if you had the windows >>>> (start time, end time), [10, 14], [12, 18], [4, 14] for the same session >>>> key, they would all be merged producing a single [4, 18] window. >>>> >>>> >>>> [1]: >>>> http://joda-time.sourceforge.net/apidocs/org/joda/time/Duration.html#standardSeconds(long) >>>> [2]: >>>> http://joda-time.sourceforge.net/apidocs/org/joda/time/Instant.html#Instant(long) >>>> [3]: >>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L59 >>>> [4]: >>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L64 >>>> >>>> On Fri, Aug 24, 2018 at 12:57 AM Bart Aelterman < >>>> [email protected]> wrote: >>>> >>>>> >>>>> Hi there, >>>>> >>>>> I am trying to implement a unit test for a pipeline that implements >>>>> session windows. However I am not getting the results I expected. I posted >>>>> the question on Stack Overflow: >>>>> https://stackoverflow.com/questions/51994579/how-to-write-unit-tests-for-session-windows-in-a-beam-pipeline >>>>> >>>>> Sorry for cross posting. >>>>> >>>>> Thanks a lot, >>>>> >>>>> Bart Aelterman >>>>> >>>>> >>>>> >>> >>> -- >>> Met vriendelijke groeten, >>> >>> Bart Aelterman >>> Freelance data scientist >>> http://www.bart-aelterman.com >>> >>> -- Met vriendelijke groeten, Bart Aelterman Freelance data scientist http://www.bart-aelterman.com
