Since you have control over watermark/processing time advancement with TestStream, your tests will be deterministic but what your testing for is only one possible output that could have been produced. You can validate that some scenarios work with non-deterministic triggers but its not practical to validate all possible outputs reasonably.
On Sat, Aug 25, 2018 at 12:54 AM Bart Aelterman <[email protected]> wrote: > Thanks. > > My point is more that with repeated triggers, there is no such thing as > one early pane. With a repeated trigger based on processing time (as I did > in my example, every 10 seconds) there are potentially a lot of early > panes. So I assume there is currently no good way to assert the output of > those different panes? > > > Op vr 24 aug. 2018 om 23:05 schreef Lukasz Cwik <[email protected]>: > >> Actually, it seems like someone has already beat you to adding the >> contribution within https://github.com/apache/beam/pull/5811, it is yet >> to be reviewed though. >> >> On Fri, Aug 24, 2018 at 1:59 PM Lukasz Cwik <[email protected]> wrote: >> >>> It seems like we are missing "inEarlyPanes" within PAssert. >>> >>> With the current API I believe you'll be limited to making sure the >>> number of windowed outputs correspond with how your triggering logic lines >>> up which isn't as strong as saying these things are early, these things are >>> on time and these things are late. e.g. >>> PAssert.that(values).inWindow(earlyWindow1).containsInAnyOrder(earlyWindow1Element1, >>> earlyWindow1Element2, ...); >>> PAssert.that(values).inWindow(earlyWindow2).containsInAnyOrder(earlyWindow2Element2, >>> earlyWindow2Element2, ...); >>> PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1, >>> ontimeWIndowElement2, ...); >>> >>> isnt as good as: >>> >>> PAssert.that(values).inEarlyPane(earlyWindow1).containsInAnyOrder(earlyWindow1Element1, >>> earlyWindow1Element2, ...); >>> PAssert.that(values).inEarlyPane(earlyWindow2).containsInAnyOrder(earlyWindow2Element2, >>> earlyWindow2Element2, ...); >>> PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1, >>> ontimeWIndowElement2, ...); >>> ... >>> >>> Feel free to add a contribution which adds inEarlyPane within PAssert. >>> Our contribution guide is a good place to start >>> https://beam.apache.org/contribute/ to make this happen. >>> >>> >>> >>> >>> On Fri, Aug 24, 2018 at 12:34 PM Bart Aelterman < >>> [email protected]> wrote: >>> >>>> Thank you Lukasz! I accepted your answer. >>>> >>>> As a follow up: if I would add repeated triggers that fire every 10 >>>> seconds like this: >>>> >>>> .apply(Window.<KV<String, >>>> String>>into(Sessions.withGapDuration(Duration.standardSeconds(600))) >>>> .triggering(Repeatedly.forever(AfterProcessingTime >>>> .pastFirstElementInPane() >>>> .plusDelayOf(Duration.standardSeconds(10)) >>>> ).orFinally(AfterWatermark.pastEndOfWindow())) >>>> .accumulatingFiredPanes() >>>> .withAllowedLateness(Duration.standardSeconds(60))) >>>> >>>> How could I now test the output of a given pane? I've only found >>>> examples where the PAssert uses inOnTimePane or inFinalPane. >>>> >>>> Thanks in advance! >>>> >>>> Bart >>>> >>>> Op vr 24 aug. 2018 om 19:42 schreef Lukasz Cwik <[email protected]>: >>>> >>>>> Replied to the SO but cross posted the answer I put: >>>>> >>>>> 1) I believe you have a simple unit issue. The window gap duration of >>>>> 600 is being specified in seconds Duration.standardSeconds[1] yet new >>>>> Instant(long)[2] uses milliseconds which means that the 600 second gap is >>>>> larger then the time interval of 700 millis causing the sessions to be >>>>> merged. >>>>> >>>>> 2) Sessions still use interval windows internally. You will need to >>>>> compute what the output window would be after all sessions are merged >>>>> based >>>>> upon your trigger strategy. By default, a session window uses the >>>>> IntervalWindow(timestamp, gap duration)[3], and merges all overlapping >>>>> windows[4] to create a larger window. For example, if you had the windows >>>>> (start time, end time), [10, 14], [12, 18], [4, 14] for the same session >>>>> key, they would all be merged producing a single [4, 18] window. >>>>> >>>>> >>>>> [1]: >>>>> http://joda-time.sourceforge.net/apidocs/org/joda/time/Duration.html#standardSeconds(long) >>>>> [2]: >>>>> http://joda-time.sourceforge.net/apidocs/org/joda/time/Instant.html#Instant(long) >>>>> [3]: >>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L59 >>>>> [4]: >>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L64 >>>>> >>>>> On Fri, Aug 24, 2018 at 12:57 AM Bart Aelterman < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> Hi there, >>>>>> >>>>>> I am trying to implement a unit test for a pipeline that implements >>>>>> session windows. However I am not getting the results I expected. I >>>>>> posted >>>>>> the question on Stack Overflow: >>>>>> https://stackoverflow.com/questions/51994579/how-to-write-unit-tests-for-session-windows-in-a-beam-pipeline >>>>>> >>>>>> Sorry for cross posting. >>>>>> >>>>>> Thanks a lot, >>>>>> >>>>>> Bart Aelterman >>>>>> >>>>>> >>>>>> >>>> >>>> -- >>>> Met vriendelijke groeten, >>>> >>>> Bart Aelterman >>>> Freelance data scientist >>>> http://www.bart-aelterman.com >>>> >>>> > > -- > Met vriendelijke groeten, > > Bart Aelterman > Freelance data scientist > http://www.bart-aelterman.com > >
