[ https://issues.apache.org/jira/browse/BEAM-4643?focusedWorklogId=152882&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152882 ]
ASF GitHub Bot logged work on BEAM-4643: ---------------------------------------- Author: ASF GitHub Bot Created on: 09/Oct/18 19:34 Start Date: 09/Oct/18 19:34 Worklog Time Spent: 10m Work Description: akedin commented on issue #5811: [BEAM-4643] Allow to check early panes of a window URL: https://github.com/apache/beam/pull/5811#issuecomment-428321951 looking ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 152882) Time Spent: 1h 40m (was: 1.5h) Remaining Estimate: 22h 20m (was: 22.5h) > Allow to check early panes of a window > -------------------------------------- > > Key: BEAM-4643 > URL: https://issues.apache.org/jira/browse/BEAM-4643 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core, testing > Affects Versions: 2.5.0 > Reporter: Logan HAUSPIE > Assignee: Logan HAUSPIE > Priority: Minor > Original Estimate: 24h > Time Spent: 1h 40m > Remaining Estimate: 22h 20m > > What I would like to do is: > {{PAssert.that(teamScores)}} > {{ .inEarlyPanes(intervalWindow(05, 20))}} > {{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2)) // > Window triggered 2 times earlier (black, 1) + (black, 1)}} > {{ .inOnTimePane(intervalWindow(05, 20))}} > {{ .containsInAnyOrder(KV.of("black", 2)) // Then triggered again by > reach the watermark (no additionnal data)}} > {{ .inFinalPane(intervalWindow(05, 20))}} > {{ .containsInAnyOrder(KV.of("black", 10))}}{{; // And then fired by > receiving a late data (black, 8)}} > NB: intervalWindow(05, 20) return an IntervalWindow from 5 minutes to 20 > minutes > > The workaround I found is to filter the PCollection to keep only the EARLY > elements with this method: > {{public static <T> PCollection<T> filter(PCollection<T> values, > PaneInfo.Timing timing) {}} > {{ PCollection<T> filtered = values}} > {{ .apply("Wrap into ValueInSingleWindow for filtering",}} > {{ ParDo.of(}} > {{ new DoFn<T, ValueInSingleWindow<T>>() {}} > {{ @ProcessElement}} > {{ public void processElement(ProcessContext c, > BoundedWindow window) {}} > {{ > c.outputWithTimestamp(ValueInSingleWindow.of(c.element(), c.timestamp(), > window, c.pane()), c.timestamp());}} > {{ }}} > {{ }}} > {{ )}} > {{ )}} > {{ .setCoder(}} > {{ ValueInSingleWindow.Coder.of(}} > {{ values.getCoder(), > values.getWindowingStrategy().getWindowFn().windowCoder()}} > {{ )}} > {{ )}} > {{ .apply(Filter.by(a -> a.getPane().getTiming() == timing))}} > {{ .apply("Unwrap from ValueInSingleWindow for filtering",}} > {{ ParDo.of(}} > {{ new DoFn<ValueInSingleWindow<T>, T>() {}} > {{ @ProcessElement}} > {{ public void processElement(ProcessContext c, BoundedWindow > window) {}} > {{ c.outputWithTimestamp(c.element().getValue(), > c.timestamp());}} > {{ }}} > {{ }}} > {{ ));}} > {{ return filtered;}} > {{ }}} > > And then check the AllPanes of the window : > {{PAssert.that(filter(teamScores, PaneInfo.Timing.EARLY))}} > {{ .inWindow(intervalWindow(05, 20))}} > {{ .containsInAnyOrder(KV.of("black", 1), KV.of("black", 2));}} > {{PAssert.that(teamScores)}} > {{ .inOnTimePane(intervalWindow(05, 20))}} > {{ .containsInAnyOrder(KV.of("black", 2))}} > {{ .inFinalPane(intervalWindow(05, 20))}} > {{ .containsInAnyOrder(KV.of("black", 10))}}{{;}} > > But it's a bit overkill. -- This message was sent by Atlassian JIRA (v7.6.3#76005)