[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=393658&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393658 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 26/Feb/20 17:58 Start Date: 26/Feb/20 17:58 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r384664775 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -938,4 +938,24 @@ private static void initializeUserState( @ProcessElement public void doNothing(ProcessContext context) {} } + + private static class Locker implements AutoCloseable { + +public static Locker locked(Lock lock) { + Locker locker = new Locker(lock); Review comment: I agree, that these changes are not directly related to this PR and we can revert them, if necessary. If I recall correctly, these changes were part of debugging of state related to [FLINK-12653](https://jira.apache.org/jira/browse/FLINK-12653). Just to add context, why these changes are there. Regarding performance, I think that this change cannot have any measurable impact on performance, because creating object in eden space is pretty cheap in current JVMs. I agree that the `try` - `finally` is working just fine, the locker pattern is just a safeguard to prevent someone forgetting add the `finally` clause. But as I said, if these changes are undesirable, we can revert them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393658) Time Spent: 11.5h (was: 11h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Fix For: 2.20.0 > > Time Spent: 11.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=393632&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393632 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 26/Feb/20 17:24 Start Date: 26/Feb/20 17:24 Worklog Time Spent: 10m Work Description: mxm commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r384643274 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -938,4 +938,24 @@ private static void initializeUserState( @ProcessElement public void doNothing(ProcessContext context) {} } + + private static class Locker implements AutoCloseable { + +public static Locker locked(Lock lock) { + Locker locker = new Locker(lock); Review comment: What has this change to do with this PR? This creates a new locking object for every state request. Why is this necessary? The existing pattern was sufficient and fast: ```java try { lock() } finally { unlock() } ``` The change adds verbosity and touches just touches a lot of code without any reason other than a style change. If at all it adds a regression. Such unrelated changes should not have been part of this PR. CC @tweise This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393632) Time Spent: 11h 10m (was: 11h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Fix For: 2.20.0 > > Time Spent: 11h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=393633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393633 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 26/Feb/20 17:24 Start Date: 26/Feb/20 17:24 Worklog Time Spent: 10m Work Description: mxm commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r384645838 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -938,4 +938,24 @@ private static void initializeUserState( @ProcessElement public void doNothing(ProcessContext context) {} } + + private static class Locker implements AutoCloseable { + +public static Locker locked(Lock lock) { + Locker locker = new Locker(lock); Review comment: Besides there are a bunch of other changes which are not related. Please factor those out in individual commits or even into separate PRs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393633) Time Spent: 11h 20m (was: 11h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Fix For: 2.20.0 > > Time Spent: 11h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=382839&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382839 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 06/Feb/20 11:19 Start Date: 06/Feb/20 11:19 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r375777640 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -132,16 +208,56 @@ public void onTimer( + "since window is too far behind inputWatermark:{}", timestamp, window, -cleanupTimer.currentInputWatermarkTime()); +stepContext.timerInternals().currentInputWatermarkTime()); } else { doFnRunner.onTimer(timerId, window, timestamp, timeDomain); } } } - @Override - public void finishBundle() { -doFnRunner.finishBundle(); + // this needs to be optimized (Sorted Map State) Review comment: Created https://issues.apache.org/jira/browse/BEAM-9256 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 382839) Time Spent: 11h (was: 10h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 11h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=382757&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382757 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 06/Feb/20 07:28 Start Date: 06/Feb/20 07:28 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 382757) Time Spent: 10h 50m (was: 10h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 10h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=380230&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380230 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 31/Jan/20 22:27 Start Date: 31/Jan/20 22:27 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-580938807 Run Java_Examples_Dataflow PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 380230) Time Spent: 10h 40m (was: 10.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 10h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379398&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379398 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 30/Jan/20 14:37 Start Date: 30/Jan/20 14:37 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-580282308 Run Java_Examples_Dataflow PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379398) Time Spent: 10.5h (was: 10h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 10.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379384&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379384 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 30/Jan/20 13:56 Start Date: 30/Jan/20 13:56 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-580264736 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379384) Time Spent: 10h 10m (was: 10h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 10h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379383 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 30/Jan/20 13:56 Start Date: 30/Jan/20 13:56 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-580264576 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379383) Time Spent: 10h (was: 9h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 10h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379382&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379382 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 30/Jan/20 13:56 Start Date: 30/Jan/20 13:56 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-580264521 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379382) Time Spent: 9h 50m (was: 9h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 9h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379385&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379385 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 30/Jan/20 13:56 Start Date: 30/Jan/20 13:56 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-580264776 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379385) Time Spent: 10h 20m (was: 10h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 10h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379381&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379381 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 30/Jan/20 13:56 Start Date: 30/Jan/20 13:56 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-580264445 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379381) Time Spent: 9h 40m (was: 9.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 9h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379380&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379380 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 30/Jan/20 13:55 Start Date: 30/Jan/20 13:55 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-580264360 Run Java_Examples_Dataflow PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 379380) Time Spent: 9.5h (was: 9h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 9.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=378199&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378199 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 28/Jan/20 12:51 Start Date: 28/Jan/20 12:51 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-579230688 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378199) Time Spent: 9h 20m (was: 9h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 9h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=378046&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378046 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 28/Jan/20 05:40 Start Date: 28/Jan/20 05:40 Worklog Time Spent: 10m Work Description: JozoVilcek commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-579089156 lgtm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 378046) Time Spent: 9h 10m (was: 9h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 9h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=372241&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-372241 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 15/Jan/20 10:24 Start Date: 15/Jan/20 10:24 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-574595417 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 372241) Time Spent: 9h (was: 8h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 9h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=371587&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371587 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 14/Jan/20 13:05 Start Date: 14/Jan/20 13:05 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r366325823 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); } else { -cleanupTimer.setForWindow(value.getValue(), window); -doFnRunner.processElement(value); +processElementUnordered(window, value); + } +} + } + + private void processElementUnordered(BoundedWindow window, WindowedValue value) { +cleanupTimer.setForWindow(value.getValue(), window); +doFnRunner.processElement(value); + } + + private void processElementOrdered(BoundedWindow window, WindowedValue value) { + +StateInternals stateInternals = stepContext.stateInternals(); +TimerInternals timerInternals = stepContext.timerInternals(); + +if (!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) { + StateNamespace namespace = StateNamespaces.window(windowCoder, window); + BagState> sortBuffer = stateInternals.state(namespace, sortBufferTag); + ValueState minStampState = stateInternals.state(namespace, sortBufferMinStampTag); + sortBuffer.add(value); + Instant minStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (value.getTimestamp().isBefore(minStamp)) { +minStamp = value.getTimestamp(); +minStampState.write(minStamp); +setupFlushTimerAndWatermarkHold(namespace, minStamp); } +} else { + reportDroppedElement(value, window); Review comment: Added mention to javadoc regarding the change in late data dropping behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371587) Time Spent: 8h 50m (was: 8h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 8h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=371502&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371502 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 14/Jan/20 10:17 Start Date: 14/Jan/20 10:17 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r366254396 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); } else { -cleanupTimer.setForWindow(value.getValue(), window); -doFnRunner.processElement(value); +processElementUnordered(window, value); + } +} + } + + private void processElementUnordered(BoundedWindow window, WindowedValue value) { +cleanupTimer.setForWindow(value.getValue(), window); +doFnRunner.processElement(value); + } + + private void processElementOrdered(BoundedWindow window, WindowedValue value) { + +StateInternals stateInternals = stepContext.stateInternals(); +TimerInternals timerInternals = stepContext.timerInternals(); + +if (!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) { + StateNamespace namespace = StateNamespaces.window(windowCoder, window); + BagState> sortBuffer = stateInternals.state(namespace, sortBufferTag); + ValueState minStampState = stateInternals.state(namespace, sortBufferMinStampTag); + sortBuffer.add(value); + Instant minStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (value.getTimestamp().isBefore(minStamp)) { +minStamp = value.getTimestamp(); +minStampState.write(minStamp); +setupFlushTimerAndWatermarkHold(namespace, minStamp); } +} else { + reportDroppedElement(value, window); Review comment: The conversation there confirms that there is on purpose different behavior of late data dropping than I expected. I will describe this in the javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 371502) Time Spent: 8h 40m (was: 8.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 8h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365881&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365881 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 15:36 Start Date: 03/Jan/20 15:36 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-570606989 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365881) Time Spent: 8.5h (was: 8h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 8.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365875&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365875 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 15:26 Start Date: 03/Jan/20 15:26 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362848568 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); } else { -cleanupTimer.setForWindow(value.getValue(), window); -doFnRunner.processElement(value); +processElementUnordered(window, value); + } +} + } + + private void processElementUnordered(BoundedWindow window, WindowedValue value) { +cleanupTimer.setForWindow(value.getValue(), window); +doFnRunner.processElement(value); + } + + private void processElementOrdered(BoundedWindow window, WindowedValue value) { + +StateInternals stateInternals = stepContext.stateInternals(); +TimerInternals timerInternals = stepContext.timerInternals(); + +if (!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) { + StateNamespace namespace = StateNamespaces.window(windowCoder, window); + BagState> sortBuffer = stateInternals.state(namespace, sortBufferTag); + ValueState minStampState = stateInternals.state(namespace, sortBufferMinStampTag); + sortBuffer.add(value); + Instant minStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (value.getTimestamp().isBefore(minStamp)) { +minStamp = value.getTimestamp(); +minStampState.write(minStamp); +setupFlushTimerAndWatermarkHold(namespace, minStamp); } +} else { + reportDroppedElement(value, window); } } private boolean isLate(BoundedWindow window) { Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); -Instant inputWM = cleanupTimer.currentInputWatermarkTime(); +Instant inputWM = stepContext.timerInternals().currentInputWatermarkTime(); return gcTime.isBefore(inputWM); } + private void reportDroppedElement(WindowedValue value, BoundedWindow window) { +droppedDueToLateness.inc(); +WindowTracing.debug( +"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " ++ "since too far behind inputWatermark:{}", Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365875) Time Spent: 8h 20m (was: 8h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 8h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365874&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365874 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 15:25 Start Date: 03/Jan/20 15:25 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362848269 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); } else { -cleanupTimer.setForWindow(value.getValue(), window); -doFnRunner.processElement(value); +processElementUnordered(window, value); + } +} + } + + private void processElementUnordered(BoundedWindow window, WindowedValue value) { +cleanupTimer.setForWindow(value.getValue(), window); +doFnRunner.processElement(value); + } + + private void processElementOrdered(BoundedWindow window, WindowedValue value) { + +StateInternals stateInternals = stepContext.stateInternals(); +TimerInternals timerInternals = stepContext.timerInternals(); + +if (!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) { + StateNamespace namespace = StateNamespaces.window(windowCoder, window); + BagState> sortBuffer = stateInternals.state(namespace, sortBufferTag); + ValueState minStampState = stateInternals.state(namespace, sortBufferMinStampTag); + sortBuffer.add(value); + Instant minStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (value.getTimestamp().isBefore(minStamp)) { +minStamp = value.getTimestamp(); +minStampState.write(minStamp); +setupFlushTimerAndWatermarkHold(namespace, minStamp); } +} else { + reportDroppedElement(value, window); Review comment: There will be no difference in dropped data when using and not using this annotation. Mailing list thread for reference: https://lists.apache.org/thread.html/c37dfb6c545fba7d794a13c507dccebb654bbd8b317dab748a6775dc%40%3Cdev.beam.apache.org%3E This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365874) Time Spent: 8h 10m (was: 8h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365873&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365873 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 15:23 Start Date: 03/Jan/20 15:23 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362847453 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365873) Time Spent: 8h (was: 7h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 8h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365822&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365822 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 12:02 Start Date: 03/Jan/20 12:02 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-570554091 @JozoVilcek I have resolved some issues mentioned. I have come across issue in DirectRunner (not dropping late data), which when resolved might enable adding more tests verifying that the annotation does not imply dropping more data. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365822) Time Spent: 7h 50m (was: 7h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365794 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 10:34 Start Date: 03/Jan/20 10:34 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362764966 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2363,6 +2367,136 @@ public int hashCode() { return getClass().hashCode(); } } + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class +}) +public void testRequiresTimeSortedInput() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps))); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithTestStream() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(stamp); + } + testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithLateData() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder input = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); +if (stamp == 100) { + // advance watermark when we have 100 remaining elements + // all the rest are going to be late elements + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); +} + } + testTimeSortedInput( Review comment: That is not possible. Every runner might drop different elements, because it depends on how watermark is propagated from the `TestStream` to the downstream operators. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365794) Time Spent: 7h 40m (was: 7.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365793&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365793 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 10:33 Start Date: 03/Jan/20 10:33 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362764697 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ## @@ -656,6 +656,24 @@ public Duration getAllowedTimestampSkew() { @Target(ElementType.METHOD) public @interface RequiresStableInput {} + /** + * Experimental - no backwards compatibility guarantees. The exact name or usage of this + * feature may change. + * + * Annotation that may be added to a {@link ProcessElement} method to indicate that the runner + * must ensure that the observable contents of the input {@link PCollection} is sorted by time, in + * ascending order. The time ordering is generally defined by element's timestamp, but an + * alternative user supplied ordering function can be supplied. + * + * Note that this annotation makes sense only for stateful {@code ParDo}s, because outcome of + * stateless functions cannot depend on the ordering. + */ + @Documented + @Experimental + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface RequiresTimeSortedInput {} Review comment: There should be no change regarding handling late data between using and not using this annotation. I will create tests to make sure it is the case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365793) Time Spent: 7.5h (was: 7h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365791&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365791 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 10:32 Start Date: 03/Jan/20 10:32 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362764455 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); + } +}; + } + + private static + PairFlatMapFunction>, TupleTag, WindowedValue> + wrapDoFnFromSortedRDD( + MultiDoFnFunction, OutputT> doFnFunction, + Coder keyCoder, + Coder> wvCoder) { + +return (Iterator> in) -> { + Iterator, WindowedValue>>> mappedGroups; + mappedGroups = + Iterators.transform( + splitBySameKey(in, keyCoder, wvCoder), + group -> { +try { + return doFnFunction.call(group); +} catch (Exception ex) { + throw new RuntimeException(ex); +} + }); + return flatten(mappedGroups); +}; + } + + @VisibleForTesting + static Iterator flatten(final Iterator> toFlatten) { + +return new AbstractIterator() { + + @Nullable Iterator current = null; + + @Override + protected T computeNext() { +while (true) { + if (current == null) { +if (toFlatten.hasNext()) { + current = toFlatten.next(); +} else { + return endOfData(); +} + } + if (current.hasNext()) { +return current.next(); + } + current = null; +} + } +}; + } + + @VisibleForTesting + static Iterator>>> splitBySameKey( + Iterator> in, Coder keyCoder, Coder> wvCoder) { + +return new AbstractIterator>>>() { + + @Nullable Tuple2 read = null; + + @Override + protected Iterator>> computeNext() { +readNext(); +if (read != null) { + byte[] value = read._1().getValue(); + byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8); + K key = CoderHelpers.fromByteArray(keyPart, keyCoder); + return createIteratorForKey(keyPart, key); +} +return endOfData(); + } + + private void readNext() { +if (read == null) { + if (in.hasNext()) { +read = in.next(); + } +} + } + + private void consumed() { +read = null; + } + + private Ite
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365790 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 10:32 Start Date: 03/Jan/20 10:32 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362764455 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); + } +}; + } + + private static + PairFlatMapFunction>, TupleTag, WindowedValue> + wrapDoFnFromSortedRDD( + MultiDoFnFunction, OutputT> doFnFunction, + Coder keyCoder, + Coder> wvCoder) { + +return (Iterator> in) -> { + Iterator, WindowedValue>>> mappedGroups; + mappedGroups = + Iterators.transform( + splitBySameKey(in, keyCoder, wvCoder), + group -> { +try { + return doFnFunction.call(group); +} catch (Exception ex) { + throw new RuntimeException(ex); +} + }); + return flatten(mappedGroups); +}; + } + + @VisibleForTesting + static Iterator flatten(final Iterator> toFlatten) { + +return new AbstractIterator() { + + @Nullable Iterator current = null; + + @Override + protected T computeNext() { +while (true) { + if (current == null) { +if (toFlatten.hasNext()) { + current = toFlatten.next(); +} else { + return endOfData(); +} + } + if (current.hasNext()) { +return current.next(); + } + current = null; +} + } +}; + } + + @VisibleForTesting + static Iterator>>> splitBySameKey( + Iterator> in, Coder keyCoder, Coder> wvCoder) { + +return new AbstractIterator>>>() { + + @Nullable Tuple2 read = null; + + @Override + protected Iterator>> computeNext() { +readNext(); +if (read != null) { + byte[] value = read._1().getValue(); + byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8); + K key = CoderHelpers.fromByteArray(keyPart, keyCoder); + return createIteratorForKey(keyPart, key); +} +return endOfData(); + } + + private void readNext() { +if (read == null) { + if (in.hasNext()) { +read = in.next(); + } +} + } + + private void consumed() { +read = null; + } + + private Ite
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365788&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365788 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 10:30 Start Date: 03/Jan/20 10:30 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362764095 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); + } +}; + } + + private static + PairFlatMapFunction>, TupleTag, WindowedValue> + wrapDoFnFromSortedRDD( + MultiDoFnFunction, OutputT> doFnFunction, + Coder keyCoder, + Coder> wvCoder) { + +return (Iterator> in) -> { + Iterator, WindowedValue>>> mappedGroups; + mappedGroups = + Iterators.transform( + splitBySameKey(in, keyCoder, wvCoder), + group -> { +try { + return doFnFunction.call(group); +} catch (Exception ex) { + throw new RuntimeException(ex); +} + }); + return flatten(mappedGroups); +}; + } + + @VisibleForTesting + static Iterator flatten(final Iterator> toFlatten) { + +return new AbstractIterator() { + + @Nullable Iterator current = null; + + @Override + protected T computeNext() { +while (true) { + if (current == null) { +if (toFlatten.hasNext()) { + current = toFlatten.next(); +} else { + return endOfData(); +} + } + if (current.hasNext()) { +return current.next(); + } + current = null; +} + } +}; + } + + @VisibleForTesting + static Iterator>>> splitBySameKey( + Iterator> in, Coder keyCoder, Coder> wvCoder) { + +return new AbstractIterator>>>() { + + @Nullable Tuple2 read = null; + + @Override + protected Iterator>> computeNext() { +readNext(); +if (read != null) { + byte[] value = read._1().getValue(); + byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8); Review comment: Same problem. Not related to `Coders`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ---
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365787 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 10:30 Start Date: 03/Jan/20 10:30 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362763933 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); Review comment: I wouldn't do that. The utility class is named `CoderHelpers` and this creates `Partitioner`, which has nothing to do with `Coders`. We would have to refactor it to something like `ByteArrayHelpers`, but I don't think that it would do this PR more readable. We might postpone that after time this is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365787) Time Spent: 6h 50m (was: 6h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365784 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 10:21 Start Date: 03/Jan/20 10:21 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362761604 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ## @@ -188,15 +196,35 @@ public void process(ProcessContext c) {} // A single bundle with some elements in the global window; it should register cleanup for the // global window state merely by having the evaluator created. The cleanup logic does not // depend on the window. -CommittedBundle> inputBundle = +CommittedBundle>> inputBundle = Review comment: This relates to how coders are passed around in DirectRunner. See commit dfe825284b8be76736155e82c8ce278a45640595. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365784) Time Spent: 6h 40m (was: 6.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 6h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365780&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365780 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 03/Jan/20 10:09 Start Date: 03/Jan/20 10:09 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r362758124 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); } else { -cleanupTimer.setForWindow(value.getValue(), window); -doFnRunner.processElement(value); +processElementUnordered(window, value); + } +} + } + + private void processElementUnordered(BoundedWindow window, WindowedValue value) { +cleanupTimer.setForWindow(value.getValue(), window); +doFnRunner.processElement(value); + } + + private void processElementOrdered(BoundedWindow window, WindowedValue value) { + +StateInternals stateInternals = stepContext.stateInternals(); +TimerInternals timerInternals = stepContext.timerInternals(); + +if (!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) { + StateNamespace namespace = StateNamespaces.window(windowCoder, window); + BagState> sortBuffer = stateInternals.state(namespace, sortBufferTag); + ValueState minStampState = stateInternals.state(namespace, sortBufferMinStampTag); + sortBuffer.add(value); + Instant minStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (value.getTimestamp().isBefore(minStamp)) { +minStamp = value.getTimestamp(); +minStampState.write(minStamp); +setupFlushTimerAndWatermarkHold(namespace, minStamp); } +} else { + reportDroppedElement(value, window); Review comment: I think that this implies that we have to take better care of allowed lateness here. Elements that are more late than allowed lateness are dropped by any stateful dofn, this is no new behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365780) Time Spent: 6.5h (was: 6h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356919&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356919 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 10/Dec/19 09:22 Start Date: 10/Dec/19 09:22 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355922803 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2363,6 +2367,136 @@ public int hashCode() { return getClass().hashCode(); } } + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class +}) +public void testRequiresTimeSortedInput() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps))); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithTestStream() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(stamp); + } + testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithLateData() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder input = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); +if (stamp == 100) { + // advance watermark when we have 100 remaining elements + // all the rest are going to be late elements + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); +} + } + testTimeSortedInput( + numElements - 100, + numElements - 1, + pipeline.apply(input.advanceWatermarkToInfinity()), + // Only direct runner is guaranteed to correctly drop elements after watermark coming + // from different (sub-)partitions of TestStream. Cannot reference DirectRunner by + // class to avoid cyclic dependencies. + pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner")); Review comment: I'm afraid, that code reuse between the two test modules would not be straightforward and would introduce much more counter intuitive code. The only option would be to split the test into two parts: a) validate that the logic of the late data dropping is correct (that is, test the annotation itself) (this would have to be placed outside validates runner suites) b) validate that runners obey the annotation (validates runner) There would be some code duplication probably, but it might be worth it. I will look into that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356919) Time Spent: 6h 20m (was: 6h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter:
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356914&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356914 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 10/Dec/19 09:15 Start Date: 10/Dec/19 09:15 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355919241 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2363,6 +2367,136 @@ public int hashCode() { return getClass().hashCode(); } } + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class +}) +public void testRequiresTimeSortedInput() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps))); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithTestStream() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(stamp); + } + testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithLateData() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder input = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); +if (stamp == 100) { + // advance watermark when we have 100 remaining elements + // all the rest are going to be late elements + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); +} + } + testTimeSortedInput( + numElements - 100, + numElements - 1, + pipeline.apply(input.advanceWatermarkToInfinity()), + // Only direct runner is guaranteed to correctly drop elements after watermark coming + // from different (sub-)partitions of TestStream. Cannot reference DirectRunner by + // class to avoid cyclic dependencies. + pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner")); Review comment: If it is about code duplication around setting up TestStream, that can be reused via shared routine. I would actually prefer cleaner association of which test logic make sense for which test category and avoid "sniffing context" where it is running. It did confuse at least myself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356914) Time Spent: 6h 10m (was: 6h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https:/
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356904&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356904 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 10/Dec/19 09:00 Start Date: 10/Dec/19 09:00 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355912021 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2363,6 +2367,136 @@ public int hashCode() { return getClass().hashCode(); } } + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class +}) +public void testRequiresTimeSortedInput() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps))); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithTestStream() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(stamp); + } + testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithLateData() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder input = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); +if (stamp == 100) { + // advance watermark when we have 100 remaining elements + // all the rest are going to be late elements + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); +} + } + testTimeSortedInput( + numElements - 100, + numElements - 1, + pipeline.apply(input.advanceWatermarkToInfinity()), + // Only direct runner is guaranteed to correctly drop elements after watermark coming + // from different (sub-)partitions of TestStream. Cannot reference DirectRunner by + // class to avoid cyclic dependencies. + pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner")); Review comment: Yes, it is the latter. Direct runner is somewhat special, because it is intentionally designed to be single-process and thus can have stricter guarantees. Another option would be to split (basically copy & paste) this test to two versions - one that would reside in tests specific for DirectRunner and the other would be validates runner test. I think that this is probably not worth the code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356904) Time Spent: 6h (was: 5h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > I
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356901&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356901 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 10/Dec/19 08:57 Start Date: 10/Dec/19 08:57 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355910682 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2363,6 +2367,136 @@ public int hashCode() { return getClass().hashCode(); } } + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class +}) +public void testRequiresTimeSortedInput() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps))); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithTestStream() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(stamp); + } + testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithLateData() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder input = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); +if (stamp == 100) { + // advance watermark when we have 100 remaining elements + // all the rest are going to be late elements + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); +} + } + testTimeSortedInput( + numElements - 100, + numElements - 1, + pipeline.apply(input.advanceWatermarkToInfinity()), + // Only direct runner is guaranteed to correctly drop elements after watermark coming + // from different (sub-)partitions of TestStream. Cannot reference DirectRunner by + // class to avoid cyclic dependencies. + pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner")); Review comment: Is this a problem of runner (Flink) implementing TestStream sub-optimally or that Beam can not have a full control over watermark behavior? I think it is latter. From comment, I was not sure if it is workaround of not testing on all runners because it is broken or a workaround of assigning which runner this make sense to run on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356901) Time Spent: 5h 50m (was: 5h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for sta
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356893 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 10/Dec/19 08:48 Start Date: 10/Dec/19 08:48 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355906532 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java ## @@ -164,7 +164,7 @@ private void fireTimers() { transformTimers.getKey(), (PCollection) Iterables.getOnlyElement( - transformTimers.getExecutable().getInputs().values())) + transformTimers.getExecutable().getMainInputs().values())) Review comment: I think it manifests, I just didn't have time to file a JIRA for that. The change is pretty much small, just differentiates "main" input from "any other inputs". `getInputs` returns all inputs including side inputs (as it is today), while `getMainInputs()` return only the non-side inputs, which should be single input in this case and so `Iterables.getOnlyElement` will not fail. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356893) Time Spent: 5h 40m (was: 5.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356891&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356891 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 10/Dec/19 08:45 Start Date: 10/Dec/19 08:45 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355905210 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); + } +}; + } + + private static + PairFlatMapFunction>, TupleTag, WindowedValue> + wrapDoFnFromSortedRDD( + MultiDoFnFunction, OutputT> doFnFunction, + Coder keyCoder, + Coder> wvCoder) { + +return (Iterator> in) -> { + Iterator, WindowedValue>>> mappedGroups; + mappedGroups = Review comment: imho it is quite bad to have to make such tradeoffs when formatting is working against us. But OK, does not have to concert this PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356891) Time Spent: 5.5h (was: 5h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356888 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 10/Dec/19 08:42 Start Date: 10/Dec/19 08:42 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355903979 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java ## @@ -164,7 +164,7 @@ private void fireTimers() { transformTimers.getKey(), (PCollection) Iterables.getOnlyElement( - transformTimers.getExecutable().getInputs().values())) + transformTimers.getExecutable().getMainInputs().values())) Review comment: To get a better understanding, this is a big triggered changes of this feature. It does not manifest without them, that is why it does not make sense to track it in separate JIRA? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356888) Time Spent: 5h 20m (was: 5h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356538&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356538 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 20:47 Start Date: 09/Dec/19 20:47 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355675316 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -132,16 +208,56 @@ public void onTimer( + "since window is too far behind inputWatermark:{}", timestamp, window, -cleanupTimer.currentInputWatermarkTime()); +stepContext.timerInternals().currentInputWatermarkTime()); } else { doFnRunner.onTimer(timerId, window, timestamp, timeDomain); } } } - @Override - public void finishBundle() { -doFnRunner.finishBundle(); + // this needs to be optimized (Sorted Map State) Review comment: I will create the JIRA after this is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356538) Time Spent: 5h 10m (was: 5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356537&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356537 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 20:46 Start Date: 09/Dec/19 20:46 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355675160 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -132,16 +208,56 @@ public void onTimer( + "since window is too far behind inputWatermark:{}", timestamp, window, -cleanupTimer.currentInputWatermarkTime()); +stepContext.timerInternals().currentInputWatermarkTime()); } else { doFnRunner.onTimer(timerId, window, timestamp, timeDomain); } } } - @Override - public void finishBundle() { -doFnRunner.finishBundle(); + // this needs to be optimized (Sorted Map State) Review comment: Actually, it turns out that when the buffer is flushed not per single element, but for each input watermark move, this optimization might be negligible. I'm seeing good performance even without it. JIRA would be good, as when there is Sorted Map State, then this can be leveraged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356537) Time Spent: 5h (was: 4h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356535&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356535 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 20:44 Start Date: 09/Dec/19 20:44 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355674168 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); } else { -cleanupTimer.setForWindow(value.getValue(), window); -doFnRunner.processElement(value); +processElementUnordered(window, value); + } +} + } + + private void processElementUnordered(BoundedWindow window, WindowedValue value) { +cleanupTimer.setForWindow(value.getValue(), window); +doFnRunner.processElement(value); + } + + private void processElementOrdered(BoundedWindow window, WindowedValue value) { + +StateInternals stateInternals = stepContext.stateInternals(); +TimerInternals timerInternals = stepContext.timerInternals(); + +if (!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) { + StateNamespace namespace = StateNamespaces.window(windowCoder, window); + BagState> sortBuffer = stateInternals.state(namespace, sortBufferTag); + ValueState minStampState = stateInternals.state(namespace, sortBufferMinStampTag); + sortBuffer.add(value); + Instant minStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (value.getTimestamp().isBefore(minStamp)) { +minStamp = value.getTimestamp(); +minStampState.write(minStamp); +setupFlushTimerAndWatermarkHold(namespace, minStamp); } +} else { + reportDroppedElement(value, window); } } private boolean isLate(BoundedWindow window) { Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); -Instant inputWM = cleanupTimer.currentInputWatermarkTime(); +Instant inputWM = stepContext.timerInternals().currentInputWatermarkTime(); return gcTime.isBefore(inputWM); } + private void reportDroppedElement(WindowedValue value, BoundedWindow window) { +droppedDueToLateness.inc(); +WindowTracing.debug( +"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " ++ "since too far behind inputWatermark:{}", Review comment: Should be. But we need to modify the flushing behavior to respect allowed lateness. This is wrong in the current implementation. Will fix that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356535) Time Spent: 4h 50m (was: 4h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - tim
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356534&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356534 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 20:43 Start Date: 09/Dec/19 20:43 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355673447 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); Review comment: This implies that we have to buffer elements up to watermark + allowed lateness. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356534) Time Spent: 4h 40m (was: 4.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356146&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356146 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 12:51 Start Date: 09/Dec/19 12:51 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355431292 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2363,6 +2367,136 @@ public int hashCode() { return getClass().hashCode(); } } + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class +}) +public void testRequiresTimeSortedInput() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps))); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithTestStream() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(stamp); + } + testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithLateData() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder input = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); +if (stamp == 100) { + // advance watermark when we have 100 remaining elements + // all the rest are going to be late elements + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); +} + } + testTimeSortedInput( + numElements - 100, + numElements - 1, + pipeline.apply(input.advanceWatermarkToInfinity()), + // Only direct runner is guaranteed to correctly drop elements after watermark coming + // from different (sub-)partitions of TestStream. Cannot reference DirectRunner by + // class to avoid cyclic dependencies. + pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner")); Review comment: Other runners (flink to be specific) might handle watermark differently. Only direct runner synchronously advances watermark, so that what you mark as "late" in TestStream will really end up late. When running the same TestStream with flink, the watermark might get updated at different times and thus the data might not be late and the output is indeed different. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356146) Time Spent: 4.5h (was: 4h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTim
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356138&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356138 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 12:47 Start Date: 09/Dec/19 12:47 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355429361 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); + } +}; + } + + private static + PairFlatMapFunction>, TupleTag, WindowedValue> + wrapDoFnFromSortedRDD( + MultiDoFnFunction, OutputT> doFnFunction, + Coder keyCoder, + Coder> wvCoder) { + +return (Iterator> in) -> { + Iterator, WindowedValue>>> mappedGroups; + mappedGroups = Review comment: The reason is formatting. The latter implies additional level of indentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356138) Time Spent: 4h 20m (was: 4h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356127&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356127 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 12:22 Start Date: 09/Dec/19 12:22 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355419523 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ## @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception { .apply(Window.into(FixedWindows.of(Duration.millis(10; TupleTag mainOutput = new TupleTag<>(); -PCollection produced = -input -.apply( -new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( -new DoFn, Integer>() { - @StateId(stateId) - private final StateSpec> spec = - StateSpecs.value(StringUtf8Coder.of()); +final ParDoMultiOverrideFactory.GbkThenStatefulParDo +gbkThenStatefulParDo; +gbkThenStatefulParDo = Review comment: That actually formats really ugly, so I prefer this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356127) Time Spent: 4h 10m (was: 4h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356126&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356126 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 12:20 Start Date: 09/Dec/19 12:20 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355418278 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ## @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception { .apply(Window.into(FixedWindows.of(Duration.millis(10; TupleTag mainOutput = new TupleTag<>(); -PCollection produced = -input -.apply( -new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( -new DoFn, Integer>() { - @StateId(stateId) - private final StateSpec> spec = - StateSpecs.value(StringUtf8Coder.of()); +final ParDoMultiOverrideFactory.GbkThenStatefulParDo +gbkThenStatefulParDo; +gbkThenStatefulParDo = +new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( +new DoFn, Integer>() { + @StateId(stateId) + private final StateSpec> spec = + StateSpecs.value(StringUtf8Coder.of()); + + @ProcessElement + public void process(ProcessContext c) {} +}, +mainOutput, +TupleTagList.empty(), +Collections.emptyList(), +DoFnSchemaInformation.create(), +Collections.emptyMap()); + +final PCollection>> grouped; +grouped = gbkThenStatefulParDo.groupToKeyedWorkItem(input); Review comment: ```suggestion final PCollection>> grouped = gbkThenStatefulParDo.groupToKeyedWorkItem(input); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356126) Time Spent: 4h (was: 3h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356125&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356125 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 12:19 Start Date: 09/Dec/19 12:19 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355418278 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ## @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception { .apply(Window.into(FixedWindows.of(Duration.millis(10; TupleTag mainOutput = new TupleTag<>(); -PCollection produced = -input -.apply( -new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( -new DoFn, Integer>() { - @StateId(stateId) - private final StateSpec> spec = - StateSpecs.value(StringUtf8Coder.of()); +final ParDoMultiOverrideFactory.GbkThenStatefulParDo +gbkThenStatefulParDo; +gbkThenStatefulParDo = +new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( +new DoFn, Integer>() { + @StateId(stateId) + private final StateSpec> spec = + StateSpecs.value(StringUtf8Coder.of()); + + @ProcessElement + public void process(ProcessContext c) {} +}, +mainOutput, +TupleTagList.empty(), +Collections.emptyList(), +DoFnSchemaInformation.create(), +Collections.emptyMap()); + +final PCollection>> grouped; +grouped = gbkThenStatefulParDo.groupToKeyedWorkItem(input); Review comment: ```suggestion final PCollection>> grouped = gbkThenStatefulParDo.groupToKeyedWorkItem(input); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356125) Time Spent: 3h 50m (was: 3h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356122&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356122 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 09/Dec/19 12:18 Start Date: 09/Dec/19 12:18 Worklog Time Spent: 10m Work Description: je-ik commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355417745 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java ## @@ -164,7 +164,7 @@ private void fireTimers() { transformTimers.getKey(), (PCollection) Iterables.getOnlyElement( - transformTimers.getExecutable().getInputs().values())) + transformTimers.getExecutable().getMainInputs().values())) Review comment: This actually fixes a bug. The original code fails when there are side inputs, because `getInputs` throws exception. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356122) Time Spent: 3h 40m (was: 3.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355793&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355793 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355175226 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); } else { -cleanupTimer.setForWindow(value.getValue(), window); -doFnRunner.processElement(value); +processElementUnordered(window, value); + } +} + } + + private void processElementUnordered(BoundedWindow window, WindowedValue value) { +cleanupTimer.setForWindow(value.getValue(), window); +doFnRunner.processElement(value); + } + + private void processElementOrdered(BoundedWindow window, WindowedValue value) { + +StateInternals stateInternals = stepContext.stateInternals(); +TimerInternals timerInternals = stepContext.timerInternals(); + +if (!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) { + StateNamespace namespace = StateNamespaces.window(windowCoder, window); + BagState> sortBuffer = stateInternals.state(namespace, sortBufferTag); + ValueState minStampState = stateInternals.state(namespace, sortBufferMinStampTag); + sortBuffer.add(value); + Instant minStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (value.getTimestamp().isBefore(minStamp)) { +minStamp = value.getTimestamp(); +minStampState.write(minStamp); +setupFlushTimerAndWatermarkHold(namespace, minStamp); } +} else { + reportDroppedElement(value, window); } } private boolean isLate(BoundedWindow window) { Instant gcTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); -Instant inputWM = cleanupTimer.currentInputWatermarkTime(); +Instant inputWM = stepContext.timerInternals().currentInputWatermarkTime(); return gcTime.isBefore(inputWM); } + private void reportDroppedElement(WindowedValue value, BoundedWindow window) { +droppedDueToLateness.inc(); +WindowTracing.debug( +"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " ++ "since too far behind inputWatermark:{}", Review comment: Is this message correct in case of processing and dropping from ordered code path? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355793) Time Spent: 3h 10m (was: 3h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > -
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355788&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355788 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355168688 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); Review comment: Can we move this close to the utility producing value with timestamp. So have `toByteArrayWithTs` and e.g. `toByteArrayWithoutTs` (any naming is fine) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355788) Time Spent: 2h 50m (was: 2h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355794 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355172757 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ## @@ -656,6 +656,24 @@ public Duration getAllowedTimestampSkew() { @Target(ElementType.METHOD) public @interface RequiresStableInput {} + /** + * Experimental - no backwards compatibility guarantees. The exact name or usage of this + * feature may change. + * + * Annotation that may be added to a {@link ProcessElement} method to indicate that the runner + * must ensure that the observable contents of the input {@link PCollection} is sorted by time, in + * ascending order. The time ordering is generally defined by element's timestamp, but an + * alternative user supplied ordering function can be supplied. + * + * Note that this annotation makes sense only for stateful {@code ParDo}s, because outcome of + * stateless functions cannot depend on the ordering. + */ + @Documented + @Experimental + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface RequiresTimeSortedInput {} Review comment: I think doc should be enriched with more details and tradeoffs of this annotations. Definitely impact on handling late data needs to be mentioned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355794) Time Spent: 3h 10m (was: 3h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355797&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355797 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355173711 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2363,6 +2367,136 @@ public int hashCode() { return getClass().hashCode(); } } + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class +}) +public void testRequiresTimeSortedInput() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps))); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithTestStream() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(stamp); + } + testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithLateData() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder input = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); +if (stamp == 100) { + // advance watermark when we have 100 remaining elements + // all the rest are going to be late elements + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); +} + } + testTimeSortedInput( Review comment: If I am correct, this test does not verify which elements (keys) exactly are dropped as late. Can it be made more explicit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355797) Time Spent: 3.5h (was: 3h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355790 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355168771 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); + } +}; + } + + private static + PairFlatMapFunction>, TupleTag, WindowedValue> + wrapDoFnFromSortedRDD( + MultiDoFnFunction, OutputT> doFnFunction, + Coder keyCoder, + Coder> wvCoder) { + +return (Iterator> in) -> { + Iterator, WindowedValue>>> mappedGroups; + mappedGroups = + Iterators.transform( + splitBySameKey(in, keyCoder, wvCoder), + group -> { +try { + return doFnFunction.call(group); +} catch (Exception ex) { + throw new RuntimeException(ex); +} + }); + return flatten(mappedGroups); +}; + } + + @VisibleForTesting + static Iterator flatten(final Iterator> toFlatten) { + +return new AbstractIterator() { + + @Nullable Iterator current = null; + + @Override + protected T computeNext() { +while (true) { + if (current == null) { +if (toFlatten.hasNext()) { + current = toFlatten.next(); +} else { + return endOfData(); +} + } + if (current.hasNext()) { +return current.next(); + } + current = null; +} + } +}; + } + + @VisibleForTesting + static Iterator>>> splitBySameKey( + Iterator> in, Coder keyCoder, Coder> wvCoder) { + +return new AbstractIterator>>>() { + + @Nullable Tuple2 read = null; + + @Override + protected Iterator>> computeNext() { +readNext(); +if (read != null) { + byte[] value = read._1().getValue(); + byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8); + K key = CoderHelpers.fromByteArray(keyPart, keyCoder); + return createIteratorForKey(keyPart, key); +} +return endOfData(); + } + + private void readNext() { +if (read == null) { + if (in.hasNext()) { +read = in.next(); + } +} + } + + private void consumed() { +read = null; + } + + privat
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355784 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355132678 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ## @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception { .apply(Window.into(FixedWindows.of(Duration.millis(10; TupleTag mainOutput = new TupleTag<>(); -PCollection produced = -input -.apply( -new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( -new DoFn, Integer>() { - @StateId(stateId) - private final StateSpec> spec = - StateSpecs.value(StringUtf8Coder.of()); +final ParDoMultiOverrideFactory.GbkThenStatefulParDo +gbkThenStatefulParDo; +gbkThenStatefulParDo = +new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( +new DoFn, Integer>() { + @StateId(stateId) + private final StateSpec> spec = + StateSpecs.value(StringUtf8Coder.of()); + + @ProcessElement + public void process(ProcessContext c) {} +}, +mainOutput, +TupleTagList.empty(), +Collections.emptyList(), +DoFnSchemaInformation.create(), +Collections.emptyMap()); + +final PCollection>> grouped; +grouped = gbkThenStatefulParDo.groupToKeyedWorkItem(input); Review comment: We can do a direct assignment here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355784) Time Spent: 2h 20m (was: 2h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355791&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355791 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355173620 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2363,6 +2367,136 @@ public int hashCode() { return getClass().hashCode(); } } + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class +}) +public void testRequiresTimeSortedInput() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps))); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithTestStream() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(stamp); + } + testTimeSortedInput(numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithLateData() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder input = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +input = input.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); +if (stamp == 100) { + // advance watermark when we have 100 remaining elements + // all the rest are going to be late elements + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); +} + } + testTimeSortedInput( + numElements - 100, + numElements - 1, + pipeline.apply(input.advanceWatermarkToInfinity()), + // Only direct runner is guaranteed to correctly drop elements after watermark coming + // from different (sub-)partitions of TestStream. Cannot reference DirectRunner by + // class to avoid cyclic dependencies. + pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner")); Review comment: What does this mean exactly? Other runners does not have same guarantees about late data drop and produce different result? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355791) Time Spent: 3h (was: 2h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by times
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355785&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355785 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355132294 ## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java ## @@ -164,7 +164,7 @@ private void fireTimers() { transformTimers.getKey(), (PCollection) Iterables.getOnlyElement( - transformTimers.getExecutable().getInputs().values())) + transformTimers.getExecutable().getMainInputs().values())) Review comment: I do not follow what is this change about. Can you elaborate? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355785) Time Spent: 2.5h (was: 2h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355787 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355168488 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); + } +}; + } + + private static + PairFlatMapFunction>, TupleTag, WindowedValue> + wrapDoFnFromSortedRDD( + MultiDoFnFunction, OutputT> doFnFunction, + Coder keyCoder, + Coder> wvCoder) { + +return (Iterator> in) -> { + Iterator, WindowedValue>>> mappedGroups; + mappedGroups = Review comment: I wonder why sometimes you prefer: ``` T myVariable; myVariable = value(); ``` rather that directly ``` T myVariable = value() ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355787) Time Spent: 2h 40m (was: 2.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355786&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355786 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355164129 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ## @@ -188,15 +196,35 @@ public void process(ProcessContext c) {} // A single bundle with some elements in the global window; it should register cleanup for the // global window state merely by having the evaluator created. The cleanup logic does not // depend on the window. -CommittedBundle> inputBundle = +CommittedBundle>> inputBundle = Review comment: What is a benefit of expanding KV -> KeyedWorkItem? I do not see connection to the requires time sorted input. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355786) Time Spent: 2h 40m (was: 2.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355795&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355795 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355175519 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -132,16 +208,56 @@ public void onTimer( + "since window is too far behind inputWatermark:{}", timestamp, window, -cleanupTimer.currentInputWatermarkTime()); +stepContext.timerInternals().currentInputWatermarkTime()); } else { doFnRunner.onTimer(timerId, window, timestamp, timeDomain); } } } - @Override - public void finishBundle() { -doFnRunner.finishBundle(); + // this needs to be optimized (Sorted Map State) Review comment: Is it a performace implication worth to mention to annotation users? Can it be link it to a JIRA? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355795) Time Spent: 3h 20m (was: 3h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355783&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355783 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355163869 ## File path: runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ## @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception { .apply(Window.into(FixedWindows.of(Duration.millis(10; TupleTag mainOutput = new TupleTag<>(); -PCollection produced = -input -.apply( -new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( -new DoFn, Integer>() { - @StateId(stateId) - private final StateSpec> spec = - StateSpecs.value(StringUtf8Coder.of()); +final ParDoMultiOverrideFactory.GbkThenStatefulParDo +gbkThenStatefulParDo; +gbkThenStatefulParDo = Review comment: We can do a direct assignment here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355783) Time Spent: 2h 10m (was: 2h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355789 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355168726 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD + .map( + input -> { +final K key = input.getKey(); +Iterable> value = input.getValue(); +return FluentIterable.from(value) +.transform( +windowedValue -> +windowedValue.withValue(KV.of(key, windowedValue.getValue( +.iterator(); + }) + .flatMapToPair(doFnFunction); +} + +JavaPairRDD pairRDD = +kvInRDD +.map(new ReifyTimestampsAndWindowsFunction<>()) +.mapToPair(TranslationUtils.toPairFunction()) +.mapToPair( +CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> in._2().getTimestamp())); + +JavaPairRDD sorted = + pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner)); + +return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, keyCoder, wvCoder)); + } + + private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) { +return new Partitioner() { + @Override + public int numPartitions() { +return partitioner.numPartitions(); + } + + @Override + public int getPartition(Object o) { +ByteArray b = (ByteArray) o; +return partitioner.getPartition( +new ByteArray(Arrays.copyOfRange(b.getValue(), 0, b.getValue().length - 8))); + } +}; + } + + private static + PairFlatMapFunction>, TupleTag, WindowedValue> + wrapDoFnFromSortedRDD( + MultiDoFnFunction, OutputT> doFnFunction, + Coder keyCoder, + Coder> wvCoder) { + +return (Iterator> in) -> { + Iterator, WindowedValue>>> mappedGroups; + mappedGroups = + Iterators.transform( + splitBySameKey(in, keyCoder, wvCoder), + group -> { +try { + return doFnFunction.call(group); +} catch (Exception ex) { + throw new RuntimeException(ex); +} + }); + return flatten(mappedGroups); +}; + } + + @VisibleForTesting + static Iterator flatten(final Iterator> toFlatten) { + +return new AbstractIterator() { + + @Nullable Iterator current = null; + + @Override + protected T computeNext() { +while (true) { + if (current == null) { +if (toFlatten.hasNext()) { + current = toFlatten.next(); +} else { + return endOfData(); +} + } + if (current.hasNext()) { +return current.next(); + } + current = null; +} + } +}; + } + + @VisibleForTesting + static Iterator>>> splitBySameKey( + Iterator> in, Coder keyCoder, Coder> wvCoder) { + +return new AbstractIterator>>>() { + + @Nullable Tuple2 read = null; + + @Override + protected Iterator>> computeNext() { +readNext(); +if (read != null) { + byte[] value = read._1().getValue(); + byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8); Review comment: Same as above. Move this close to `toByteArrayWithTs`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355796 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355174975 ## File path: runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ## @@ -85,45 +123,83 @@ public void startBundle() { doFnRunner.startBundle(); } + @Override + public void finishBundle() { +doFnRunner.finishBundle(); + } + @Override public void processElement(WindowedValue input) { // StatefulDoFnRunner always observes windows, so we need to explode for (WindowedValue value : input.explodeWindows()) { - BoundedWindow window = value.getWindows().iterator().next(); - if (isLate(window)) { // The element is too late for this window. -droppedDueToLateness.inc(); -WindowTracing.debug( -"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " -+ "since too far behind inputWatermark:{}", -input.getTimestamp(), -window, -cleanupTimer.currentInputWatermarkTime()); +reportDroppedElement(value, window); + } else if (requiresTimeSortedInput) { +processElementOrdered(window, value); } else { -cleanupTimer.setForWindow(value.getValue(), window); -doFnRunner.processElement(value); +processElementUnordered(window, value); + } +} + } + + private void processElementUnordered(BoundedWindow window, WindowedValue value) { +cleanupTimer.setForWindow(value.getValue(), window); +doFnRunner.processElement(value); + } + + private void processElementOrdered(BoundedWindow window, WindowedValue value) { + +StateInternals stateInternals = stepContext.stateInternals(); +TimerInternals timerInternals = stepContext.timerInternals(); + +if (!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) { + StateNamespace namespace = StateNamespaces.window(windowCoder, window); + BagState> sortBuffer = stateInternals.state(namespace, sortBufferTag); + ValueState minStampState = stateInternals.state(namespace, sortBufferMinStampTag); + sortBuffer.add(value); + Instant minStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (value.getTimestamp().isBefore(minStamp)) { +minStamp = value.getTimestamp(); +minStampState.write(minStamp); +setupFlushTimerAndWatermarkHold(namespace, minStamp); } +} else { + reportDroppedElement(value, window); Review comment: Such internal detail needs to be more explicitly mentioned in javadoc for the annotation. What if I am processing stream and my out of order spread can be quite large. Does it make sense for me to use this annotation or better not. Some recommendations / hints for users would be great. This makes me hesitant to use this annotation for stream processing (not that I have use case for it right now) unless I can be sure about timing of my data stream and watermark move. I wonder if it would make sense to give some tradeoffs between latency, buffer size, lateness and introduce a possibility to hold watermark back for some delta, smaller than max allowed lateness. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355796) Time Spent: 3.5h (was: 3h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed latene
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355792&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355792 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 08/Dec/19 10:24 Start Date: 08/Dec/19 10:24 Worklog Time Spent: 10m Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#discussion_r355167752 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ## @@ -448,18 +458,152 @@ public String toNativeString() { JavaRDD>>> groupRDD = GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, partitioner); -return groupRDD -.map( -input -> { - final K key = input.getKey(); - Iterable> value = input.getValue(); - return FluentIterable.from(value) - .transform( - windowedValue -> - windowedValue.withValue(KV.of(key, windowedValue.getValue( - .iterator(); -}) -.flatMapToPair(doFnFunction); +if (!requiresSortedInput) { + return groupRDD Review comment: Can we make `groupRDD` local context of this `if-else` branch? Also, it would be a bit more intuitive if it is organized as `if(requiresSortedInput) { ... } else { ... }` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355792) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338791&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338791 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 14:44 Start Date: 05/Nov/19 14:44 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549852980 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338791) Time Spent: 2h (was: 1h 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338734&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338734 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 14:16 Start Date: 05/Nov/19 14:16 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549841187 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338734) Time Spent: 1h 20m (was: 1h 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338735&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338735 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 14:16 Start Date: 05/Nov/19 14:16 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549841255 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338735) Time Spent: 1.5h (was: 1h 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338736&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338736 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 14:16 Start Date: 05/Nov/19 14:16 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549841386 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338736) Time Spent: 1h 40m (was: 1.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338737 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 14:16 Start Date: 05/Nov/19 14:16 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549841324 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338737) Time Spent: 1h 50m (was: 1h 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338691 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 12:29 Start Date: 05/Nov/19 12:29 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549802817 Run Direct ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338691) Time Spent: 50m (was: 40m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338693 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 12:29 Start Date: 05/Nov/19 12:29 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549802952 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338693) Time Spent: 1h 10m (was: 1h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338692 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 12:29 Start Date: 05/Nov/19 12:29 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549802918 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338692) Time Spent: 1h (was: 50m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338690&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338690 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 05/Nov/19 12:28 Start Date: 05/Nov/19 12:28 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549802764 Run Flink ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338690) Time Spent: 40m (was: 0.5h) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338122&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338122 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 04/Nov/19 14:20 Start Date: 04/Nov/19 14:20 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549375080 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338122) Time Spent: 0.5h (was: 20m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338121&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338121 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 04/Nov/19 14:20 Start Date: 04/Nov/19 14:20 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549375013 Run Java PreCommi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338121) Time Spent: 20m (was: 10m) > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation
[ https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338109&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338109 ] ASF GitHub Bot logged work on BEAM-8550: Author: ASF GitHub Bot Created on: 04/Nov/19 13:53 Start Date: 04/Nov/19 13:53 Worklog Time Spent: 10m Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] Requires time sorted input URL: https://github.com/apache/beam/pull/8774#issuecomment-549362888 Run Java_Examples_Dataflow PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 338109) Remaining Estimate: 0h Time Spent: 10m > @RequiresTimeSortedInput DoFn annotation > > > Key: BEAM-8550 > URL: https://issues.apache.org/jira/browse/BEAM-8550 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core >Reporter: Jan Lukavský >Assignee: Jan Lukavský >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as > described in [design > document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing]. > First implementation will assume that: > - time is defined by timestamp in associated WindowedValue > - allowed lateness is explicitly zero and all late elements are dropped > (due to being out of order) > The above properties are considered temporary and will be resolved by > subsequent extensions (backwards compatible). -- This message was sent by Atlassian Jira (v8.3.4#803005)