Thanks.

My point is more that with repeated triggers, there is no such thing as one
early pane. With a repeated trigger based on processing time (as I did in
my example, every 10 seconds) there are potentially a lot of early panes.
So I assume there is currently no good way to assert the output of those
different panes?


Op vr 24 aug. 2018 om 23:05 schreef Lukasz Cwik <[email protected]>:

> Actually, it seems like someone has already beat you to adding the
> contribution within https://github.com/apache/beam/pull/5811, it is yet
> to be reviewed though.
>
> On Fri, Aug 24, 2018 at 1:59 PM Lukasz Cwik <[email protected]> wrote:
>
>> It seems like we are missing "inEarlyPanes" within PAssert.
>>
>> With the current API I believe you'll be limited to making sure the
>> number of windowed outputs correspond with how your triggering logic lines
>> up which isn't as strong as saying these things are early, these things are
>> on time and these things are late. e.g.
>> PAssert.that(values).inWindow(earlyWindow1).containsInAnyOrder(earlyWindow1Element1,
>> earlyWindow1Element2, ...);
>> PAssert.that(values).inWindow(earlyWindow2).containsInAnyOrder(earlyWindow2Element2,
>> earlyWindow2Element2, ...);
>> PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1,
>> ontimeWIndowElement2, ...);
>>
>> isnt as good as:
>>
>> PAssert.that(values).inEarlyPane(earlyWindow1).containsInAnyOrder(earlyWindow1Element1,
>> earlyWindow1Element2, ...);
>> PAssert.that(values).inEarlyPane(earlyWindow2).containsInAnyOrder(earlyWindow2Element2,
>> earlyWindow2Element2, ...);
>> PAssert.that(values).inOnTimePane(ontimeWIndow).containsInAnyOrder(ontimeWIndowElement1,
>> ontimeWIndowElement2, ...);
>> ...
>>
>> Feel free to add a contribution which adds inEarlyPane within PAssert.
>> Our contribution guide is a good place to start
>> https://beam.apache.org/contribute/ to make this happen.
>>
>>
>>
>>
>> On Fri, Aug 24, 2018 at 12:34 PM Bart Aelterman <[email protected]>
>> wrote:
>>
>>> Thank you Lukasz! I accepted your answer.
>>>
>>> As a follow up: if I would add repeated triggers that fire every 10
>>> seconds like this:
>>>
>>> .apply(Window.<KV<String,
>>> String>>into(Sessions.withGapDuration(Duration.standardSeconds(600)))
>>>         .triggering(Repeatedly.forever(AfterProcessingTime
>>>                 .pastFirstElementInPane()
>>>                 .plusDelayOf(Duration.standardSeconds(10))
>>>         ).orFinally(AfterWatermark.pastEndOfWindow()))
>>>         .accumulatingFiredPanes()
>>>         .withAllowedLateness(Duration.standardSeconds(60)))
>>>
>>> How could I now test the output of a given pane? I've only found
>>> examples where the PAssert uses inOnTimePane or inFinalPane.
>>>
>>> Thanks in advance!
>>>
>>> Bart
>>>
>>> Op vr 24 aug. 2018 om 19:42 schreef Lukasz Cwik <[email protected]>:
>>>
>>>> Replied to the SO but cross posted the answer I put:
>>>>
>>>> 1) I believe you have a simple unit issue. The window gap duration of
>>>> 600 is being specified in seconds Duration.standardSeconds[1] yet new
>>>> Instant(long)[2] uses milliseconds which means that the 600 second gap is
>>>> larger then the time interval of 700 millis causing the sessions to be
>>>> merged.
>>>>
>>>> 2) Sessions still use interval windows internally. You will need to
>>>> compute what the output window would be after all sessions are merged based
>>>> upon your trigger strategy. By default, a session window uses the
>>>> IntervalWindow(timestamp, gap duration)[3], and merges all overlapping
>>>> windows[4] to create a larger window. For example, if you had the windows
>>>> (start time, end time), [10, 14], [12, 18], [4, 14] for the same session
>>>> key, they would all be merged producing a single [4, 18] window.
>>>>
>>>>
>>>>   [1]:
>>>> http://joda-time.sourceforge.net/apidocs/org/joda/time/Duration.html#standardSeconds(long)
>>>>   [2]:
>>>> http://joda-time.sourceforge.net/apidocs/org/joda/time/Instant.html#Instant(long)
>>>>   [3]:
>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L59
>>>>   [4]:
>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L64
>>>>
>>>> On Fri, Aug 24, 2018 at 12:57 AM Bart Aelterman <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>> Hi there,
>>>>>
>>>>> I am trying to implement a unit test for a pipeline that implements
>>>>> session windows. However I am not getting the results I expected. I posted
>>>>> the question on Stack Overflow:
>>>>> https://stackoverflow.com/questions/51994579/how-to-write-unit-tests-for-session-windows-in-a-beam-pipeline
>>>>>
>>>>> Sorry for cross posting.
>>>>>
>>>>> Thanks a lot,
>>>>>
>>>>> Bart Aelterman
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Met vriendelijke groeten,
>>>
>>> Bart Aelterman
>>> Freelance data scientist
>>> http://www.bart-aelterman.com
>>>
>>>

-- 
Met vriendelijke groeten,

Bart Aelterman
Freelance data scientist
http://www.bart-aelterman.com

Reply via email to