Actually, it seems like someone has already beat you to adding the
contribution within https://github.com/apache/beam/pull/5811, it is yet to
be reviewed though.

On Fri, Aug 24, 2018 at 1:59 PM Lukasz Cwik <[email protected]> wrote:

> It seems like we are missing "inEarlyPanes" within PAssert.
>
> With the current API I believe you'll be limited to making sure the number
> of windowed outputs correspond with how your triggering logic lines up
> which isn't as strong as saying these things are early, these things are on
> time and these things are late. e.g.
> PAssert.that(values).inWindow(earlyWindow1).containsInAnyOrder(earlyWindow1Element1,
> earlyWindow1Element2, ...);
> PAssert.that(values).inWindow(earlyWindow2).containsInAnyOrder(earlyWindow2Element2,
> earlyWindow2Element2, ...);
> PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1,
> ontimeWIndowElement2, ...);
>
> isnt as good as:
>
> PAssert.that(values).inEarlyPane(earlyWindow1).containsInAnyOrder(earlyWindow1Element1,
> earlyWindow1Element2, ...);
> PAssert.that(values).inEarlyPane(earlyWindow2).containsInAnyOrder(earlyWindow2Element2,
> earlyWindow2Element2, ...);
> PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1,
> ontimeWIndowElement2, ...);
> ...
>
> Feel free to add a contribution which adds inEarlyPane within PAssert. Our
> contribution guide is a good place to start
> https://beam.apache.org/contribute/ to make this happen.
>
>
>
>
> On Fri, Aug 24, 2018 at 12:34 PM Bart Aelterman <[email protected]>
> wrote:
>
>> Thank you Lukasz! I accepted your answer.
>>
>> As a follow up: if I would add repeated triggers that fire every 10
>> seconds like this:
>>
>> .apply(Window.<KV<String,
>> String>>into(Sessions.withGapDuration(Duration.standardSeconds(600)))
>>         .triggering(Repeatedly.forever(AfterProcessingTime
>>                 .pastFirstElementInPane()
>>                 .plusDelayOf(Duration.standardSeconds(10))
>>         ).orFinally(AfterWatermark.pastEndOfWindow()))
>>         .accumulatingFiredPanes()
>>         .withAllowedLateness(Duration.standardSeconds(60)))
>>
>> How could I now test the output of a given pane? I've only found examples
>> where the PAssert uses inOnTimePane or inFinalPane.
>>
>> Thanks in advance!
>>
>> Bart
>>
>> Op vr 24 aug. 2018 om 19:42 schreef Lukasz Cwik <[email protected]>:
>>
>>> Replied to the SO but cross posted the answer I put:
>>>
>>> 1) I believe you have a simple unit issue. The window gap duration of
>>> 600 is being specified in seconds Duration.standardSeconds[1] yet new
>>> Instant(long)[2] uses milliseconds which means that the 600 second gap is
>>> larger then the time interval of 700 millis causing the sessions to be
>>> merged.
>>>
>>> 2) Sessions still use interval windows internally. You will need to
>>> compute what the output window would be after all sessions are merged based
>>> upon your trigger strategy. By default, a session window uses the
>>> IntervalWindow(timestamp, gap duration)[3], and merges all overlapping
>>> windows[4] to create a larger window. For example, if you had the windows
>>> (start time, end time), [10, 14], [12, 18], [4, 14] for the same session
>>> key, they would all be merged producing a single [4, 18] window.
>>>
>>>
>>>   [1]:
>>> http://joda-time.sourceforge.net/apidocs/org/joda/time/Duration.html#standardSeconds(long)
>>>   [2]:
>>> http://joda-time.sourceforge.net/apidocs/org/joda/time/Instant.html#Instant(long)
>>>   [3]:
>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L59
>>>   [4]:
>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L64
>>>
>>> On Fri, Aug 24, 2018 at 12:57 AM Bart Aelterman <
>>> [email protected]> wrote:
>>>
>>>>
>>>> Hi there,
>>>>
>>>> I am trying to implement a unit test for a pipeline that implements
>>>> session windows. However I am not getting the results I expected. I posted
>>>> the question on Stack Overflow:
>>>> https://stackoverflow.com/questions/51994579/how-to-write-unit-tests-for-session-windows-in-a-beam-pipeline
>>>>
>>>> Sorry for cross posting.
>>>>
>>>> Thanks a lot,
>>>>
>>>> Bart Aelterman
>>>>
>>>>
>>>>
>>
>> --
>> Met vriendelijke groeten,
>>
>> Bart Aelterman
>> Freelance data scientist
>> http://www.bart-aelterman.com
>>
>>

Reply via email to