Ah, my mistake. The capability matrix is, in fact, set up to categorize all
of these precisely as fixed/sliding/sessions/custom. LGTM after all.

On Thu, Jul 27, 2017 at 8:19 AM, Etienne Chauchot <echauc...@gmail.com>
wrote:

> Thanks Robert and Kenn for your comments.
>
> "Custom window merging" may be an improper term.
>
> Let me add a bit of context to explain: while coding for Nexmark port (
> https://github.com/apache/beam/pull/3114) and in particular for Query9, a
> use case arose: even if it could have been implemented differently (I guess
> the original idea was to test to extend windowFn), query 9 used a UDF that
> extends windowFn and assigns elements to windows based on element values.
> Theses windows could then be merged based on a field in addition to a
> timestamp.
>
> The fact that it relies on the values of elements makes it is another case
> than "standard" windows.
>
> Consider a case in which a user creates a UDF such as the one recently
> added to WindowTest (https://github.com/apache/beam/pull/3286) see an
> test example bellow:
>
> private static class CustomWindow extends IntervalWindow {
>   private boolean isBig; ...
> }
>
> private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> {
>   ...
>   @Override public Collection<CustomWindow> assignWindows(AssignContext
> c)throws Exception {
>     ...
>     String element =  c.element(); // put big elements in windows of 30s
> and small ones in windows of 5s if ("big".equals(element)) {
>       return Collections.singletonList(
>           new CustomWindow(c.timestamp(), 
> c.timestamp().plus(Duration.standardSeconds(30)),
> true)); }else {
>       return Collections.singletonList(
>           new CustomWindow(c.timestamp(), 
> c.timestamp().plus(Duration.standardSeconds(5)),
> false)); }
>   }
>
>   @Override public void mergeWindows(MergeContext c)throws Exception {
>     List<CustomWindow> toBeMerged = new ArrayList<>(); CustomWindow
> bigWindow =null; for (CustomWindow customWindow : c.windows()) {
>       if (customWindow.isBig) {
>         bigWindow = customWindow; toBeMerged.add(customWindow); }else if
> (bigWindow !=null && customWindow.start().isAfter(bigWindow.start())
>           && customWindow.end().isBefore(bigWindow.end())) {
>         toBeMerged.add(customWindow); }
>     }
>     // in case bigWindow has not been seen yet if (bigWindow !=null) {
>       // merge small windows into big windows c.merge(toBeMerged,
> bigWindow); }
>   }
>
> Indeed like Robert said,  runners might have partial support for this use
> case. For example, spark currently relies on IntervalWindow to implement
> the merge (https://github.com/apache/beam/blob/master/runners/spark/
> src/main/java/org/apache/beam/runners/spark/translation/Spar
> kAbstractCombineFn.java#L97) so the UDF windowFn.merge will not be
> called. But Flink relies on the actual windowFn.merge (
> https://github.com/apache/beam/blob/master/runners/flink/
> src/main/java/org/apache/beam/runners/flink/translation/
> functions/HashingFlinkCombineRunner.java#L140).
>
> That point is discussed in this ticket https://issues.apache.org/jira
> /browse/BEAM-1772 for Flink and this ticket https://issues.apache.org/jira
> /browse/BEAM-2499 for Spark.
>
> Amit, Aljoscha, if you want to add precisions, feel free to comment.
>
> I don't know if it is an addition to the model but I agree, it might
> deserve some discussion.
>
> Best!
>
> Etienne
>
>
>
> Le 27/07/2017 à 08:39, Robert Bradshaw a écrit :
>
>> On Wed, Jul 26, 2017 at 9:43 PM, Kenneth Knowles <k...@google.com.invalid>
>> wrote:
>>
>> This is a bit of an improvised change to the Beam model, if these are
>>> really treated *that* specially. (notably, they are a subset of the
>>> WindowFns that we ship with our SDKs, so it really is a careful
>>> selection)
>>>
>>> It does make sense to have some special WindowFns with distinguished
>>> semantics, since a lot of use cases are covered without the generality of
>>> the Beam model. Having a compact proto repr for these makes sense, but
>>> the
>>> particular choice of functions I had viewed as a hack while the Fn API
>>> was
>>> maturing to support the real model.
>>>
>>> Compactness of the proto representation isn't important, but it is good
>> to
>> often have a language-independent representation (and definition). Many
>> common UserFns should likely have a language-independent urn and format,
>> especially once we start looking at multi-language pipelines.
>>
>> If this is a new special category in the model (capability matrix?) there
>>
>>> should be a lot more input and deliberation as to what they are. TBH I
>>> think we will probably choose just the WindowFns that you did, but the
>>> standardization deserves attention and documentation.
>>>
>>> These are already called out individually in the compatibility matrix,
>> which probably makes sense as it allows a runner to declare "partial"
>> support for windowing.
>>
>> On Wed, Jul 26, 2017 at 9:34 PM, Robert Bradshaw <
>>
>>> rober...@google.com.invalid> wrote:
>>>
>>> I think there may be a distinction between hard-coding support for the
>>>> "standard" WindowFns (e.g.
>>>> https://github.com/apache/beam/blob/master/sdks/common/
>>>> runner-api/src/main/proto/standard_window_fns.proto)
>>>> and accepting WindowFns as a UDF. Different runners have offered
>>>>
>>> different
>>>
>>>> levels of support for this in the past (for example, the Fn API doesn't
>>>> support WindowFns other than these standard ones unless they're
>>>>
>>> implemented
>>>
>>>> in Java--and even then it'd probably be better for these to be executed
>>>>
>>> in
>>>
>>>> the SDK context).
>>>>
>>>> On Wed, Jul 26, 2017 at 9:16 PM, Kenneth Knowles <k...@google.com.invalid
>>>>
>>>> wrote:
>>>>
>>>> Hi Etienne,
>>>>>
>>>>> Every WindowFn is a UDF, so there is really no such thing as "custom"
>>>>> window merging. Is this the same as saying that a runner supports only
>>>>> merging for Sessions? Or just supports WindowFn that merges based on
>>>>> overlap?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, Jul 24, 2017 at 10:15 AM, Etienne Chauchot <
>>>>>
>>>> echauc...@gmail.com>
>>>
>>>> wrote:
>>>>>
>>>>> Hi all,
>>>>>>
>>>>>> There is now 2 new ValidatesRunner tests: WindowTest.
>>>>>>
>>>>> testMergingCustomWindows
>>>>>
>>>>>> and WindowTest.testMergingCustomWindowsKeyedCollection. The aim of
>>>>>>
>>>>> these
>>>>
>>>>> tests is to verify that the runners can handle custom windowFn
>>>>>>
>>>>> (extensions
>>>>>
>>>>>> of windowFn that, for example, could rely on elements in addition to
>>>>>> timestamps).
>>>>>>
>>>>>> As new runners are coming, I wanted to let you know that there is
>>>>>>
>>>>> also
>>>
>>>> a
>>>>
>>>>> new category tag UsesCustomWindowMerging that you can use to skip
>>>>>>
>>>>> these
>>>
>>>> tests while running ValidatesRunner tests on runners that do not
>>>>>>
>>>>> support
>>>>
>>>>> custom window merging yet.
>>>>>>
>>>>>> Besides, there is also an ongoing related PR (
>>>>>> https://github.com/apache/beam/pull/3592) to enhance the test utils
>>>>>> methods of WindowFnTestUtils.
>>>>>>
>>>>>> Etienne
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>

Reply via email to