Ah, my mistake. The capability matrix is, in fact, set up to categorize all of these precisely as fixed/sliding/sessions/custom. LGTM after all.
On Thu, Jul 27, 2017 at 8:19 AM, Etienne Chauchot <echauc...@gmail.com> wrote: > Thanks Robert and Kenn for your comments. > > "Custom window merging" may be an improper term. > > Let me add a bit of context to explain: while coding for Nexmark port ( > https://github.com/apache/beam/pull/3114) and in particular for Query9, a > use case arose: even if it could have been implemented differently (I guess > the original idea was to test to extend windowFn), query 9 used a UDF that > extends windowFn and assigns elements to windows based on element values. > Theses windows could then be merged based on a field in addition to a > timestamp. > > The fact that it relies on the values of elements makes it is another case > than "standard" windows. > > Consider a case in which a user creates a UDF such as the one recently > added to WindowTest (https://github.com/apache/beam/pull/3286) see an > test example bellow: > > private static class CustomWindow extends IntervalWindow { > private boolean isBig; ... > } > > private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> { > ... > @Override public Collection<CustomWindow> assignWindows(AssignContext > c)throws Exception { > ... > String element = c.element(); // put big elements in windows of 30s > and small ones in windows of 5s if ("big".equals(element)) { > return Collections.singletonList( > new CustomWindow(c.timestamp(), > c.timestamp().plus(Duration.standardSeconds(30)), > true)); }else { > return Collections.singletonList( > new CustomWindow(c.timestamp(), > c.timestamp().plus(Duration.standardSeconds(5)), > false)); } > } > > @Override public void mergeWindows(MergeContext c)throws Exception { > List<CustomWindow> toBeMerged = new ArrayList<>(); CustomWindow > bigWindow =null; for (CustomWindow customWindow : c.windows()) { > if (customWindow.isBig) { > bigWindow = customWindow; toBeMerged.add(customWindow); }else if > (bigWindow !=null && customWindow.start().isAfter(bigWindow.start()) > && customWindow.end().isBefore(bigWindow.end())) { > toBeMerged.add(customWindow); } > } > // in case bigWindow has not been seen yet if (bigWindow !=null) { > // merge small windows into big windows c.merge(toBeMerged, > bigWindow); } > } > > Indeed like Robert said, runners might have partial support for this use > case. For example, spark currently relies on IntervalWindow to implement > the merge (https://github.com/apache/beam/blob/master/runners/spark/ > src/main/java/org/apache/beam/runners/spark/translation/Spar > kAbstractCombineFn.java#L97) so the UDF windowFn.merge will not be > called. But Flink relies on the actual windowFn.merge ( > https://github.com/apache/beam/blob/master/runners/flink/ > src/main/java/org/apache/beam/runners/flink/translation/ > functions/HashingFlinkCombineRunner.java#L140). > > That point is discussed in this ticket https://issues.apache.org/jira > /browse/BEAM-1772 for Flink and this ticket https://issues.apache.org/jira > /browse/BEAM-2499 for Spark. > > Amit, Aljoscha, if you want to add precisions, feel free to comment. > > I don't know if it is an addition to the model but I agree, it might > deserve some discussion. > > Best! > > Etienne > > > > Le 27/07/2017 à 08:39, Robert Bradshaw a écrit : > >> On Wed, Jul 26, 2017 at 9:43 PM, Kenneth Knowles <k...@google.com.invalid> >> wrote: >> >> This is a bit of an improvised change to the Beam model, if these are >>> really treated *that* specially. (notably, they are a subset of the >>> WindowFns that we ship with our SDKs, so it really is a careful >>> selection) >>> >>> It does make sense to have some special WindowFns with distinguished >>> semantics, since a lot of use cases are covered without the generality of >>> the Beam model. Having a compact proto repr for these makes sense, but >>> the >>> particular choice of functions I had viewed as a hack while the Fn API >>> was >>> maturing to support the real model. >>> >>> Compactness of the proto representation isn't important, but it is good >> to >> often have a language-independent representation (and definition). Many >> common UserFns should likely have a language-independent urn and format, >> especially once we start looking at multi-language pipelines. >> >> If this is a new special category in the model (capability matrix?) there >> >>> should be a lot more input and deliberation as to what they are. TBH I >>> think we will probably choose just the WindowFns that you did, but the >>> standardization deserves attention and documentation. >>> >>> These are already called out individually in the compatibility matrix, >> which probably makes sense as it allows a runner to declare "partial" >> support for windowing. >> >> On Wed, Jul 26, 2017 at 9:34 PM, Robert Bradshaw < >> >>> rober...@google.com.invalid> wrote: >>> >>> I think there may be a distinction between hard-coding support for the >>>> "standard" WindowFns (e.g. >>>> https://github.com/apache/beam/blob/master/sdks/common/ >>>> runner-api/src/main/proto/standard_window_fns.proto) >>>> and accepting WindowFns as a UDF. Different runners have offered >>>> >>> different >>> >>>> levels of support for this in the past (for example, the Fn API doesn't >>>> support WindowFns other than these standard ones unless they're >>>> >>> implemented >>> >>>> in Java--and even then it'd probably be better for these to be executed >>>> >>> in >>> >>>> the SDK context). >>>> >>>> On Wed, Jul 26, 2017 at 9:16 PM, Kenneth Knowles <k...@google.com.invalid >>>> >>>> wrote: >>>> >>>> Hi Etienne, >>>>> >>>>> Every WindowFn is a UDF, so there is really no such thing as "custom" >>>>> window merging. Is this the same as saying that a runner supports only >>>>> merging for Sessions? Or just supports WindowFn that merges based on >>>>> overlap? >>>>> >>>>> Kenn >>>>> >>>>> On Mon, Jul 24, 2017 at 10:15 AM, Etienne Chauchot < >>>>> >>>> echauc...@gmail.com> >>> >>>> wrote: >>>>> >>>>> Hi all, >>>>>> >>>>>> There is now 2 new ValidatesRunner tests: WindowTest. >>>>>> >>>>> testMergingCustomWindows >>>>> >>>>>> and WindowTest.testMergingCustomWindowsKeyedCollection. The aim of >>>>>> >>>>> these >>>> >>>>> tests is to verify that the runners can handle custom windowFn >>>>>> >>>>> (extensions >>>>> >>>>>> of windowFn that, for example, could rely on elements in addition to >>>>>> timestamps). >>>>>> >>>>>> As new runners are coming, I wanted to let you know that there is >>>>>> >>>>> also >>> >>>> a >>>> >>>>> new category tag UsesCustomWindowMerging that you can use to skip >>>>>> >>>>> these >>> >>>> tests while running ValidatesRunner tests on runners that do not >>>>>> >>>>> support >>>> >>>>> custom window merging yet. >>>>>> >>>>>> Besides, there is also an ongoing related PR ( >>>>>> https://github.com/apache/beam/pull/3592) to enhance the test utils >>>>>> methods of WindowFnTestUtils. >>>>>> >>>>>> Etienne >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >