[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-02-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=393658&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393658
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 26/Feb/20 17:58
Start Date: 26/Feb/20 17:58
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r384664775
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -938,4 +938,24 @@ private static void initializeUserState(
 @ProcessElement
 public void doNothing(ProcessContext context) {}
   }
+
+  private static class Locker implements AutoCloseable {
+
+public static Locker locked(Lock lock) {
+  Locker locker = new Locker(lock);
 
 Review comment:
   I agree, that these changes are not directly related to this PR and we can 
revert them, if necessary.
   If I recall correctly, these changes were part of debugging of state related 
to [FLINK-12653](https://jira.apache.org/jira/browse/FLINK-12653). Just to add 
context, why these changes are there.
   Regarding performance, I think that this change cannot have any measurable 
impact on performance, because creating object in eden space is pretty cheap in 
current JVMs.
   I agree that the `try` - `finally` is working just fine, the locker pattern 
is just a safeguard to prevent someone forgetting add the `finally` clause.
   But as I said, if these changes are undesirable, we can revert them. 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 393658)
Time Spent: 11.5h  (was: 11h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-02-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=393632&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393632
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 26/Feb/20 17:24
Start Date: 26/Feb/20 17:24
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r384643274
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -938,4 +938,24 @@ private static void initializeUserState(
 @ProcessElement
 public void doNothing(ProcessContext context) {}
   }
+
+  private static class Locker implements AutoCloseable {
+
+public static Locker locked(Lock lock) {
+  Locker locker = new Locker(lock);
 
 Review comment:
   What has this change to do with this PR? This creates a new locking object 
for every state request. Why is this necessary? The existing pattern was 
sufficient and fast:
   
   ```java
   try {
   lock()
   } finally {
   unlock()
   }
   ```
   
   The change adds verbosity and touches just touches a lot of code without any 
reason other than a style change. If at all it adds a regression.
   
   Such unrelated changes should not have been part of this PR.
   
   CC @tweise
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 393632)
Time Spent: 11h 10m  (was: 11h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 11h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-02-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=393633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393633
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 26/Feb/20 17:24
Start Date: 26/Feb/20 17:24
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r384645838
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 ##
 @@ -938,4 +938,24 @@ private static void initializeUserState(
 @ProcessElement
 public void doNothing(ProcessContext context) {}
   }
+
+  private static class Locker implements AutoCloseable {
+
+public static Locker locked(Lock lock) {
+  Locker locker = new Locker(lock);
 
 Review comment:
   Besides there are a bunch of other changes which are not related. Please 
factor those out in individual commits or even into separate PRs.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 393633)
Time Spent: 11h 20m  (was: 11h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 11h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-02-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=382839&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382839
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 06/Feb/20 11:19
Start Date: 06/Feb/20 11:19
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r375777640
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -132,16 +208,56 @@ public void onTimer(
 + "since window is too far behind inputWatermark:{}",
 timestamp,
 window,
-cleanupTimer.currentInputWatermarkTime());
+stepContext.timerInternals().currentInputWatermarkTime());
   } else {
 doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
   }
 }
   }
 
-  @Override
-  public void finishBundle() {
-doFnRunner.finishBundle();
+  // this needs to be optimized (Sorted Map State)
 
 Review comment:
   Created https://issues.apache.org/jira/browse/BEAM-9256
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 382839)
Time Spent: 11h  (was: 10h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 11h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-02-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=382757&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-382757
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 06/Feb/20 07:28
Start Date: 06/Feb/20 07:28
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 382757)
Time Spent: 10h 50m  (was: 10h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=380230&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380230
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 31/Jan/20 22:27
Start Date: 31/Jan/20 22:27
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-580938807
 
 
   Run Java_Examples_Dataflow PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 380230)
Time Spent: 10h 40m  (was: 10.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379398&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379398
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 30/Jan/20 14:37
Start Date: 30/Jan/20 14:37
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-580282308
 
 
   Run Java_Examples_Dataflow PreCommit
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 379398)
Time Spent: 10.5h  (was: 10h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379384&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379384
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 30/Jan/20 13:56
Start Date: 30/Jan/20 13:56
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-580264736
 
 
   Run Java Flink PortableValidatesRunner Streaming
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 379384)
Time Spent: 10h 10m  (was: 10h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379383
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 30/Jan/20 13:56
Start Date: 30/Jan/20 13:56
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-580264576
 
 
   Run Flink ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 379383)
Time Spent: 10h  (was: 9h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 10h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379382&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379382
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 30/Jan/20 13:56
Start Date: 30/Jan/20 13:56
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-580264521
 
 
   Run Spark ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 379382)
Time Spent: 9h 50m  (was: 9h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379385&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379385
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 30/Jan/20 13:56
Start Date: 30/Jan/20 13:56
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-580264776
 
 
   Run Java Flink PortableValidatesRunner Batch
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 379385)
Time Spent: 10h 20m  (was: 10h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379381&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379381
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 30/Jan/20 13:56
Start Date: 30/Jan/20 13:56
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-580264445
 
 
   Run Direct ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 379381)
Time Spent: 9h 40m  (was: 9.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=379380&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-379380
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 30/Jan/20 13:55
Start Date: 30/Jan/20 13:55
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-580264360
 
 
   Run Java_Examples_Dataflow PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 379380)
Time Spent: 9.5h  (was: 9h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=378199&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378199
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 28/Jan/20 12:51
Start Date: 28/Jan/20 12:51
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-579230688
 
 
   retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378199)
Time Spent: 9h 20m  (was: 9h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=378046&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378046
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 28/Jan/20 05:40
Start Date: 28/Jan/20 05:40
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on issue #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-579089156
 
 
   lgtm
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378046)
Time Spent: 9h 10m  (was: 9h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=372241&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-372241
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 15/Jan/20 10:24
Start Date: 15/Jan/20 10:24
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-574595417
 
 
   Retest this please
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 372241)
Time Spent: 9h  (was: 8h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=371587&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371587
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 14/Jan/20 13:05
Start Date: 14/Jan/20 13:05
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r366325823
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
   } else {
-cleanupTimer.setForWindow(value.getValue(), window);
-doFnRunner.processElement(value);
+processElementUnordered(window, value);
+  }
+}
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue value) {
+cleanupTimer.setForWindow(value.getValue(), window);
+doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue value) {
+
+StateInternals stateInternals = stepContext.stateInternals();
+TimerInternals timerInternals = stepContext.timerInternals();
+
+if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+  StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+  BagState> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+  ValueState minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+  sortBuffer.add(value);
+  Instant minStamp =
+  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  if (value.getTimestamp().isBefore(minStamp)) {
+minStamp = value.getTimestamp();
+minStampState.write(minStamp);
+setupFlushTimerAndWatermarkHold(namespace, minStamp);
   }
+} else {
+  reportDroppedElement(value, window);
 
 Review comment:
   Added mention to javadoc regarding the change in late data dropping behavior.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 371587)
Time Spent: 8h 50m  (was: 8h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=371502&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-371502
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 14/Jan/20 10:17
Start Date: 14/Jan/20 10:17
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r366254396
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
   } else {
-cleanupTimer.setForWindow(value.getValue(), window);
-doFnRunner.processElement(value);
+processElementUnordered(window, value);
+  }
+}
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue value) {
+cleanupTimer.setForWindow(value.getValue(), window);
+doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue value) {
+
+StateInternals stateInternals = stepContext.stateInternals();
+TimerInternals timerInternals = stepContext.timerInternals();
+
+if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+  StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+  BagState> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+  ValueState minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+  sortBuffer.add(value);
+  Instant minStamp =
+  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  if (value.getTimestamp().isBefore(minStamp)) {
+minStamp = value.getTimestamp();
+minStampState.write(minStamp);
+setupFlushTimerAndWatermarkHold(namespace, minStamp);
   }
+} else {
+  reportDroppedElement(value, window);
 
 Review comment:
   The conversation there confirms that there is on purpose different behavior 
of late data dropping than I expected. I will describe this in the javadoc.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 371502)
Time Spent: 8h 40m  (was: 8.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365881&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365881
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 15:36
Start Date: 03/Jan/20 15:36
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-570606989
 
 
   Run Flink ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365881)
Time Spent: 8.5h  (was: 8h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365875&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365875
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 15:26
Start Date: 03/Jan/20 15:26
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362848568
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
   } else {
-cleanupTimer.setForWindow(value.getValue(), window);
-doFnRunner.processElement(value);
+processElementUnordered(window, value);
+  }
+}
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue value) {
+cleanupTimer.setForWindow(value.getValue(), window);
+doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue value) {
+
+StateInternals stateInternals = stepContext.stateInternals();
+TimerInternals timerInternals = stepContext.timerInternals();
+
+if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+  StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+  BagState> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+  ValueState minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+  sortBuffer.add(value);
+  Instant minStamp =
+  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  if (value.getTimestamp().isBefore(minStamp)) {
+minStamp = value.getTimestamp();
+minStampState.write(minStamp);
+setupFlushTimerAndWatermarkHold(namespace, minStamp);
   }
+} else {
+  reportDroppedElement(value, window);
 }
   }
 
   private boolean isLate(BoundedWindow window) {
 Instant gcTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
-Instant inputWM = cleanupTimer.currentInputWatermarkTime();
+Instant inputWM = stepContext.timerInternals().currentInputWatermarkTime();
 return gcTime.isBefore(inputWM);
   }
 
+  private void reportDroppedElement(WindowedValue value, BoundedWindow 
window) {
+droppedDueToLateness.inc();
+WindowTracing.debug(
+"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
++ "since too far behind inputWatermark:{}",
 
 Review comment:
   Fixed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365875)
Time Spent: 8h 20m  (was: 8h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due 

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365874&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365874
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 15:25
Start Date: 03/Jan/20 15:25
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362848269
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
   } else {
-cleanupTimer.setForWindow(value.getValue(), window);
-doFnRunner.processElement(value);
+processElementUnordered(window, value);
+  }
+}
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue value) {
+cleanupTimer.setForWindow(value.getValue(), window);
+doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue value) {
+
+StateInternals stateInternals = stepContext.stateInternals();
+TimerInternals timerInternals = stepContext.timerInternals();
+
+if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+  StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+  BagState> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+  ValueState minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+  sortBuffer.add(value);
+  Instant minStamp =
+  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  if (value.getTimestamp().isBefore(minStamp)) {
+minStamp = value.getTimestamp();
+minStampState.write(minStamp);
+setupFlushTimerAndWatermarkHold(namespace, minStamp);
   }
+} else {
+  reportDroppedElement(value, window);
 
 Review comment:
   There will be no difference in dropped data when using and not using this 
annotation. Mailing list thread for reference: 
https://lists.apache.org/thread.html/c37dfb6c545fba7d794a13c507dccebb654bbd8b317dab748a6775dc%40%3Cdev.beam.apache.org%3E
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365874)
Time Spent: 8h 10m  (was: 8h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365873&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365873
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 15:23
Start Date: 03/Jan/20 15:23
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362847453
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
 
 Review comment:
   Fixed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365873)
Time Spent: 8h  (was: 7h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365822&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365822
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 12:02
Start Date: 03/Jan/20 12:02
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: [BEAM-8550] Requires 
time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-570554091
 
 
   @JozoVilcek I have resolved some issues mentioned. I have come across issue 
in DirectRunner (not dropping late data), which when resolved might enable 
adding more tests verifying that the annotation does not imply dropping more 
data. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365822)
Time Spent: 7h 50m  (was: 7h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365794
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 10:34
Start Date: 03/Jan/20 10:34
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362764966
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2363,6 +2367,136 @@ public int hashCode() {
 return getClass().hashCode();
   }
 }
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class
+})
+public void testRequiresTimeSortedInput() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithTestStream() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(stamp);
+  }
+  testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithLateData() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder input = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+if (stamp == 100) {
+  // advance watermark when we have 100 remaining elements
+  // all the rest are going to be late elements
+  input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+}
+  }
+  testTimeSortedInput(
 
 Review comment:
   That is not possible. Every runner might drop different elements, because it 
depends on how watermark is propagated from the `TestStream` to the downstream 
operators.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365794)
Time Spent: 7h 40m  (was: 7.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365793&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365793
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 10:33
Start Date: 03/Jan/20 10:33
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362764697
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
 ##
 @@ -656,6 +656,24 @@ public Duration getAllowedTimestampSkew() {
   @Target(ElementType.METHOD)
   public @interface RequiresStableInput {}
 
+  /**
+   * Experimental - no backwards compatibility guarantees. The exact 
name or usage of this
+   * feature may change.
+   *
+   * Annotation that may be added to a {@link ProcessElement} method to 
indicate that the runner
+   * must ensure that the observable contents of the input {@link PCollection} 
is sorted by time, in
+   * ascending order. The time ordering is generally defined by element's 
timestamp, but an
+   * alternative user supplied ordering function can be supplied.
+   *
+   * Note that this annotation makes sense only for stateful {@code 
ParDo}s, because outcome of
+   * stateless functions cannot depend on the ordering.
+   */
+  @Documented
+  @Experimental
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface RequiresTimeSortedInput {}
 
 Review comment:
   There should be no change regarding handling late data between using and not 
using this annotation. I will create tests to make sure it is the case.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365793)
Time Spent: 7.5h  (was: 7h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365791&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365791
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 10:32
Start Date: 03/Jan/20 10:32
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362764455
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+  }
+};
+  }
+
+  private static 
+  PairFlatMapFunction>, TupleTag, 
WindowedValue>
+  wrapDoFnFromSortedRDD(
+  MultiDoFnFunction, OutputT> doFnFunction,
+  Coder keyCoder,
+  Coder> wvCoder) {
+
+return (Iterator> in) -> {
+  Iterator, WindowedValue>>> mappedGroups;
+  mappedGroups =
+  Iterators.transform(
+  splitBySameKey(in, keyCoder, wvCoder),
+  group -> {
+try {
+  return doFnFunction.call(group);
+} catch (Exception ex) {
+  throw new RuntimeException(ex);
+}
+  });
+  return flatten(mappedGroups);
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator flatten(final Iterator> toFlatten) {
+
+return new AbstractIterator() {
+
+  @Nullable Iterator current = null;
+
+  @Override
+  protected T computeNext() {
+while (true) {
+  if (current == null) {
+if (toFlatten.hasNext()) {
+  current = toFlatten.next();
+} else {
+  return endOfData();
+}
+  }
+  if (current.hasNext()) {
+return current.next();
+  }
+  current = null;
+}
+  }
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator>>> splitBySameKey(
+  Iterator> in, Coder keyCoder, 
Coder> wvCoder) {
+
+return new AbstractIterator>>>() {
+
+  @Nullable Tuple2 read = null;
+
+  @Override
+  protected Iterator>> computeNext() {
+readNext();
+if (read != null) {
+  byte[] value = read._1().getValue();
+  byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8);
+  K key = CoderHelpers.fromByteArray(keyPart, keyCoder);
+  return createIteratorForKey(keyPart, key);
+}
+return endOfData();
+  }
+
+  private void readNext() {
+if (read == null) {
+  if (in.hasNext()) {
+read = in.next();
+  }
+}
+  }
+
+  private void consumed() {
+read = null;
+  }
+
+  private Ite

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365790
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 10:32
Start Date: 03/Jan/20 10:32
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362764455
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+  }
+};
+  }
+
+  private static 
+  PairFlatMapFunction>, TupleTag, 
WindowedValue>
+  wrapDoFnFromSortedRDD(
+  MultiDoFnFunction, OutputT> doFnFunction,
+  Coder keyCoder,
+  Coder> wvCoder) {
+
+return (Iterator> in) -> {
+  Iterator, WindowedValue>>> mappedGroups;
+  mappedGroups =
+  Iterators.transform(
+  splitBySameKey(in, keyCoder, wvCoder),
+  group -> {
+try {
+  return doFnFunction.call(group);
+} catch (Exception ex) {
+  throw new RuntimeException(ex);
+}
+  });
+  return flatten(mappedGroups);
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator flatten(final Iterator> toFlatten) {
+
+return new AbstractIterator() {
+
+  @Nullable Iterator current = null;
+
+  @Override
+  protected T computeNext() {
+while (true) {
+  if (current == null) {
+if (toFlatten.hasNext()) {
+  current = toFlatten.next();
+} else {
+  return endOfData();
+}
+  }
+  if (current.hasNext()) {
+return current.next();
+  }
+  current = null;
+}
+  }
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator>>> splitBySameKey(
+  Iterator> in, Coder keyCoder, 
Coder> wvCoder) {
+
+return new AbstractIterator>>>() {
+
+  @Nullable Tuple2 read = null;
+
+  @Override
+  protected Iterator>> computeNext() {
+readNext();
+if (read != null) {
+  byte[] value = read._1().getValue();
+  byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8);
+  K key = CoderHelpers.fromByteArray(keyPart, keyCoder);
+  return createIteratorForKey(keyPart, key);
+}
+return endOfData();
+  }
+
+  private void readNext() {
+if (read == null) {
+  if (in.hasNext()) {
+read = in.next();
+  }
+}
+  }
+
+  private void consumed() {
+read = null;
+  }
+
+  private Ite

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365788&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365788
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 10:30
Start Date: 03/Jan/20 10:30
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362764095
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+  }
+};
+  }
+
+  private static 
+  PairFlatMapFunction>, TupleTag, 
WindowedValue>
+  wrapDoFnFromSortedRDD(
+  MultiDoFnFunction, OutputT> doFnFunction,
+  Coder keyCoder,
+  Coder> wvCoder) {
+
+return (Iterator> in) -> {
+  Iterator, WindowedValue>>> mappedGroups;
+  mappedGroups =
+  Iterators.transform(
+  splitBySameKey(in, keyCoder, wvCoder),
+  group -> {
+try {
+  return doFnFunction.call(group);
+} catch (Exception ex) {
+  throw new RuntimeException(ex);
+}
+  });
+  return flatten(mappedGroups);
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator flatten(final Iterator> toFlatten) {
+
+return new AbstractIterator() {
+
+  @Nullable Iterator current = null;
+
+  @Override
+  protected T computeNext() {
+while (true) {
+  if (current == null) {
+if (toFlatten.hasNext()) {
+  current = toFlatten.next();
+} else {
+  return endOfData();
+}
+  }
+  if (current.hasNext()) {
+return current.next();
+  }
+  current = null;
+}
+  }
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator>>> splitBySameKey(
+  Iterator> in, Coder keyCoder, 
Coder> wvCoder) {
+
+return new AbstractIterator>>>() {
+
+  @Nullable Tuple2 read = null;
+
+  @Override
+  protected Iterator>> computeNext() {
+readNext();
+if (read != null) {
+  byte[] value = read._1().getValue();
+  byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8);
 
 Review comment:
   Same problem. Not related to `Coders`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365787
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 10:30
Start Date: 03/Jan/20 10:30
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362763933
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
 
 Review comment:
   I wouldn't do that. The utility class is named `CoderHelpers` and this 
creates `Partitioner`, which has nothing to do with `Coders`. We would have to 
refactor it to something like `ByteArrayHelpers`, but I don't think that it 
would do this PR more readable. We might postpone that after time this is 
merged.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365787)
Time Spent: 6h 50m  (was: 6h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365784
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 10:21
Start Date: 03/Jan/20 10:21
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362761604
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 ##
 @@ -188,15 +196,35 @@ public void process(ProcessContext c) {}
 // A single bundle with some elements in the global window; it should 
register cleanup for the
 // global window state merely by having the evaluator created. The cleanup 
logic does not
 // depend on the window.
-CommittedBundle> inputBundle =
+CommittedBundle>> inputBundle =
 
 Review comment:
   This relates to how coders are passed around in DirectRunner. See commit 
dfe825284b8be76736155e82c8ce278a45640595.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365784)
Time Spent: 6h 40m  (was: 6.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2020-01-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=365780&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365780
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 03/Jan/20 10:09
Start Date: 03/Jan/20 10:09
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r362758124
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
   } else {
-cleanupTimer.setForWindow(value.getValue(), window);
-doFnRunner.processElement(value);
+processElementUnordered(window, value);
+  }
+}
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue value) {
+cleanupTimer.setForWindow(value.getValue(), window);
+doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue value) {
+
+StateInternals stateInternals = stepContext.stateInternals();
+TimerInternals timerInternals = stepContext.timerInternals();
+
+if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+  StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+  BagState> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+  ValueState minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+  sortBuffer.add(value);
+  Instant minStamp =
+  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  if (value.getTimestamp().isBefore(minStamp)) {
+minStamp = value.getTimestamp();
+minStampState.write(minStamp);
+setupFlushTimerAndWatermarkHold(namespace, minStamp);
   }
+} else {
+  reportDroppedElement(value, window);
 
 Review comment:
   I think that this implies that we have to take better care of allowed 
lateness here. Elements that are more late than allowed lateness are dropped by 
any stateful dofn, this is no new behavior. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365780)
Time Spent: 6.5h  (was: 6h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356919&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356919
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 10/Dec/19 09:22
Start Date: 10/Dec/19 09:22
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355922803
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2363,6 +2367,136 @@ public int hashCode() {
 return getClass().hashCode();
   }
 }
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class
+})
+public void testRequiresTimeSortedInput() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithTestStream() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(stamp);
+  }
+  testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithLateData() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder input = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+if (stamp == 100) {
+  // advance watermark when we have 100 remaining elements
+  // all the rest are going to be late elements
+  input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+}
+  }
+  testTimeSortedInput(
+  numElements - 100,
+  numElements - 1,
+  pipeline.apply(input.advanceWatermarkToInfinity()),
+  // Only direct runner is guaranteed to correctly drop elements after 
watermark  coming
+  // from different (sub-)partitions of TestStream. Cannot reference 
DirectRunner by
+  // class to avoid cyclic dependencies.
+  
pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner"));
 
 Review comment:
   I'm afraid, that code reuse between the two test modules would not be 
straightforward and would introduce much more counter intuitive code.
   The only option would be to split the test into two parts:
a) validate that the logic of the late data dropping is correct (that is, 
test the annotation itself) (this would have to be placed outside validates 
runner suites)
b) validate that runners obey the annotation (validates runner)
   There would be some code duplication probably, but it might be worth it. I 
will look into that.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356919)
Time Spent: 6h 20m  (was: 6h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: 

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356914&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356914
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 10/Dec/19 09:15
Start Date: 10/Dec/19 09:15
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355919241
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2363,6 +2367,136 @@ public int hashCode() {
 return getClass().hashCode();
   }
 }
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class
+})
+public void testRequiresTimeSortedInput() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithTestStream() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(stamp);
+  }
+  testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithLateData() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder input = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+if (stamp == 100) {
+  // advance watermark when we have 100 remaining elements
+  // all the rest are going to be late elements
+  input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+}
+  }
+  testTimeSortedInput(
+  numElements - 100,
+  numElements - 1,
+  pipeline.apply(input.advanceWatermarkToInfinity()),
+  // Only direct runner is guaranteed to correctly drop elements after 
watermark  coming
+  // from different (sub-)partitions of TestStream. Cannot reference 
DirectRunner by
+  // class to avoid cyclic dependencies.
+  
pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner"));
 
 Review comment:
   If it is about code duplication around setting up TestStream, that can be 
reused via shared routine. I would actually prefer cleaner association of which 
test logic make sense for which test category and avoid "sniffing context" 
where it is running. It did confuse at least myself.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356914)
Time Spent: 6h 10m  (was: 6h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https:/

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356904&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356904
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 10/Dec/19 09:00
Start Date: 10/Dec/19 09:00
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355912021
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2363,6 +2367,136 @@ public int hashCode() {
 return getClass().hashCode();
   }
 }
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class
+})
+public void testRequiresTimeSortedInput() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithTestStream() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(stamp);
+  }
+  testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithLateData() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder input = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+if (stamp == 100) {
+  // advance watermark when we have 100 remaining elements
+  // all the rest are going to be late elements
+  input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+}
+  }
+  testTimeSortedInput(
+  numElements - 100,
+  numElements - 1,
+  pipeline.apply(input.advanceWatermarkToInfinity()),
+  // Only direct runner is guaranteed to correctly drop elements after 
watermark  coming
+  // from different (sub-)partitions of TestStream. Cannot reference 
DirectRunner by
+  // class to avoid cyclic dependencies.
+  
pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner"));
 
 Review comment:
   Yes, it is the latter. Direct runner is somewhat special, because it is 
intentionally designed to be single-process and thus can have stricter 
guarantees. Another option would be to split (basically copy & paste) this test 
to two versions - one that would reside in tests specific for DirectRunner and 
the other would be validates runner test. I think that this is probably not 
worth the code duplication.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356904)
Time Spent: 6h  (was: 5h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> I

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356901&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356901
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 10/Dec/19 08:57
Start Date: 10/Dec/19 08:57
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355910682
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2363,6 +2367,136 @@ public int hashCode() {
 return getClass().hashCode();
   }
 }
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class
+})
+public void testRequiresTimeSortedInput() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithTestStream() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(stamp);
+  }
+  testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithLateData() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder input = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+if (stamp == 100) {
+  // advance watermark when we have 100 remaining elements
+  // all the rest are going to be late elements
+  input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+}
+  }
+  testTimeSortedInput(
+  numElements - 100,
+  numElements - 1,
+  pipeline.apply(input.advanceWatermarkToInfinity()),
+  // Only direct runner is guaranteed to correctly drop elements after 
watermark  coming
+  // from different (sub-)partitions of TestStream. Cannot reference 
DirectRunner by
+  // class to avoid cyclic dependencies.
+  
pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner"));
 
 Review comment:
   Is this a problem of runner (Flink) implementing TestStream sub-optimally or 
that Beam can not have a full control over watermark behavior? I think it is 
latter.
   From comment, I was not sure if it is workaround of not testing on all 
runners because it is broken or a workaround of assigning which runner this 
make sense to run on.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356901)
Time Spent: 5h 50m  (was: 5h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for sta

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356893
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 10/Dec/19 08:48
Start Date: 10/Dec/19 08:48
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355906532
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
 ##
 @@ -164,7 +164,7 @@ private void fireTimers() {
 transformTimers.getKey(),
 (PCollection)
 Iterables.getOnlyElement(
-
transformTimers.getExecutable().getInputs().values()))
+
transformTimers.getExecutable().getMainInputs().values()))
 
 Review comment:
   I think it manifests, I just didn't have time to file a JIRA for that. The 
change is pretty much small, just differentiates "main" input from "any other 
inputs". `getInputs` returns all inputs including side inputs (as it is today), 
while `getMainInputs()` return only the non-side inputs, which should be single 
input in this case and so `Iterables.getOnlyElement` will not fail.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356893)
Time Spent: 5h 40m  (was: 5.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356891&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356891
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 10/Dec/19 08:45
Start Date: 10/Dec/19 08:45
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355905210
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+  }
+};
+  }
+
+  private static 
+  PairFlatMapFunction>, TupleTag, 
WindowedValue>
+  wrapDoFnFromSortedRDD(
+  MultiDoFnFunction, OutputT> doFnFunction,
+  Coder keyCoder,
+  Coder> wvCoder) {
+
+return (Iterator> in) -> {
+  Iterator, WindowedValue>>> mappedGroups;
+  mappedGroups =
 
 Review comment:
   imho it is quite bad to have to make such tradeoffs when formatting is 
working against us. But OK, does not have to concert this PR
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356891)
Time Spent: 5.5h  (was: 5h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356888&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356888
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 10/Dec/19 08:42
Start Date: 10/Dec/19 08:42
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355903979
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
 ##
 @@ -164,7 +164,7 @@ private void fireTimers() {
 transformTimers.getKey(),
 (PCollection)
 Iterables.getOnlyElement(
-
transformTimers.getExecutable().getInputs().values()))
+
transformTimers.getExecutable().getMainInputs().values()))
 
 Review comment:
   To get a better understanding, this is a big triggered changes of this 
feature. It does not manifest without them, that is why it does not make sense 
to track it in separate JIRA? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356888)
Time Spent: 5h 20m  (was: 5h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356538&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356538
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 20:47
Start Date: 09/Dec/19 20:47
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355675316
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -132,16 +208,56 @@ public void onTimer(
 + "since window is too far behind inputWatermark:{}",
 timestamp,
 window,
-cleanupTimer.currentInputWatermarkTime());
+stepContext.timerInternals().currentInputWatermarkTime());
   } else {
 doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
   }
 }
   }
 
-  @Override
-  public void finishBundle() {
-doFnRunner.finishBundle();
+  // this needs to be optimized (Sorted Map State)
 
 Review comment:
   I will create the JIRA after this is merged.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356538)
Time Spent: 5h 10m  (was: 5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356537&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356537
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 20:46
Start Date: 09/Dec/19 20:46
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355675160
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -132,16 +208,56 @@ public void onTimer(
 + "since window is too far behind inputWatermark:{}",
 timestamp,
 window,
-cleanupTimer.currentInputWatermarkTime());
+stepContext.timerInternals().currentInputWatermarkTime());
   } else {
 doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
   }
 }
   }
 
-  @Override
-  public void finishBundle() {
-doFnRunner.finishBundle();
+  // this needs to be optimized (Sorted Map State)
 
 Review comment:
   Actually, it turns out that when the buffer is flushed not per single 
element, but for each input watermark move, this optimization might be 
negligible. I'm seeing good performance even without it. JIRA would be good, as 
when there is Sorted Map State, then this can be leveraged.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356537)
Time Spent: 5h  (was: 4h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356535&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356535
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 20:44
Start Date: 09/Dec/19 20:44
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355674168
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
   } else {
-cleanupTimer.setForWindow(value.getValue(), window);
-doFnRunner.processElement(value);
+processElementUnordered(window, value);
+  }
+}
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue value) {
+cleanupTimer.setForWindow(value.getValue(), window);
+doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue value) {
+
+StateInternals stateInternals = stepContext.stateInternals();
+TimerInternals timerInternals = stepContext.timerInternals();
+
+if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+  StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+  BagState> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+  ValueState minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+  sortBuffer.add(value);
+  Instant minStamp =
+  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  if (value.getTimestamp().isBefore(minStamp)) {
+minStamp = value.getTimestamp();
+minStampState.write(minStamp);
+setupFlushTimerAndWatermarkHold(namespace, minStamp);
   }
+} else {
+  reportDroppedElement(value, window);
 }
   }
 
   private boolean isLate(BoundedWindow window) {
 Instant gcTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
-Instant inputWM = cleanupTimer.currentInputWatermarkTime();
+Instant inputWM = stepContext.timerInternals().currentInputWatermarkTime();
 return gcTime.isBefore(inputWM);
   }
 
+  private void reportDroppedElement(WindowedValue value, BoundedWindow 
window) {
+droppedDueToLateness.inc();
+WindowTracing.debug(
+"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
++ "since too far behind inputWatermark:{}",
 
 Review comment:
   Should be. But we need to modify the flushing behavior to respect allowed 
lateness. This is wrong in the current implementation. Will fix that.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356535)
Time Spent: 4h 50m  (was: 4h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - tim

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356534&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356534
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 20:43
Start Date: 09/Dec/19 20:43
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355673447
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
 
 Review comment:
   This implies that we have to buffer elements up to watermark + allowed 
lateness.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356534)
Time Spent: 4h 40m  (was: 4.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356146&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356146
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 12:51
Start Date: 09/Dec/19 12:51
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355431292
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2363,6 +2367,136 @@ public int hashCode() {
 return getClass().hashCode();
   }
 }
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class
+})
+public void testRequiresTimeSortedInput() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithTestStream() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(stamp);
+  }
+  testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithLateData() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder input = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+if (stamp == 100) {
+  // advance watermark when we have 100 remaining elements
+  // all the rest are going to be late elements
+  input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+}
+  }
+  testTimeSortedInput(
+  numElements - 100,
+  numElements - 1,
+  pipeline.apply(input.advanceWatermarkToInfinity()),
+  // Only direct runner is guaranteed to correctly drop elements after 
watermark  coming
+  // from different (sub-)partitions of TestStream. Cannot reference 
DirectRunner by
+  // class to avoid cyclic dependencies.
+  
pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner"));
 
 Review comment:
   Other runners (flink to be specific) might handle watermark differently. 
Only direct runner synchronously advances watermark, so that what you mark as 
"late" in TestStream will really end up late. When running the same TestStream 
with flink, the watermark might get updated at different times and thus the 
data might not be late and the output is indeed different.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356146)
Time Spent: 4.5h  (was: 4h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTim

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356138&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356138
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 12:47
Start Date: 09/Dec/19 12:47
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355429361
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+  }
+};
+  }
+
+  private static 
+  PairFlatMapFunction>, TupleTag, 
WindowedValue>
+  wrapDoFnFromSortedRDD(
+  MultiDoFnFunction, OutputT> doFnFunction,
+  Coder keyCoder,
+  Coder> wvCoder) {
+
+return (Iterator> in) -> {
+  Iterator, WindowedValue>>> mappedGroups;
+  mappedGroups =
 
 Review comment:
   The reason is formatting. The latter implies additional level of indentation.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356138)
Time Spent: 4h 20m  (was: 4h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356127&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356127
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 12:22
Start Date: 09/Dec/19 12:22
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355419523
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 ##
 @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception {
 .apply(Window.into(FixedWindows.of(Duration.millis(10;
 
 TupleTag mainOutput = new TupleTag<>();
-PCollection produced =
-input
-.apply(
-new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
-new DoFn, Integer>() {
-  @StateId(stateId)
-  private final StateSpec> spec =
-  StateSpecs.value(StringUtf8Coder.of());
+final ParDoMultiOverrideFactory.GbkThenStatefulParDo
+gbkThenStatefulParDo;
+gbkThenStatefulParDo =
 
 Review comment:
   That actually formats really ugly, so I prefer this one.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356127)
Time Spent: 4h 10m  (was: 4h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356126&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356126
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 12:20
Start Date: 09/Dec/19 12:20
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355418278
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 ##
 @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception {
 .apply(Window.into(FixedWindows.of(Duration.millis(10;
 
 TupleTag mainOutput = new TupleTag<>();
-PCollection produced =
-input
-.apply(
-new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
-new DoFn, Integer>() {
-  @StateId(stateId)
-  private final StateSpec> spec =
-  StateSpecs.value(StringUtf8Coder.of());
+final ParDoMultiOverrideFactory.GbkThenStatefulParDo
+gbkThenStatefulParDo;
+gbkThenStatefulParDo =
+new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
+new DoFn, Integer>() {
+  @StateId(stateId)
+  private final StateSpec> spec =
+  StateSpecs.value(StringUtf8Coder.of());
+
+  @ProcessElement
+  public void process(ProcessContext c) {}
+},
+mainOutput,
+TupleTagList.empty(),
+Collections.emptyList(),
+DoFnSchemaInformation.create(),
+Collections.emptyMap());
+
+final PCollection>> grouped;
+grouped = gbkThenStatefulParDo.groupToKeyedWorkItem(input);
 
 Review comment:
   ```suggestion
   final PCollection>> grouped =
   gbkThenStatefulParDo.groupToKeyedWorkItem(input);
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356126)
Time Spent: 4h  (was: 3h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356125&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356125
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 12:19
Start Date: 09/Dec/19 12:19
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355418278
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 ##
 @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception {
 .apply(Window.into(FixedWindows.of(Duration.millis(10;
 
 TupleTag mainOutput = new TupleTag<>();
-PCollection produced =
-input
-.apply(
-new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
-new DoFn, Integer>() {
-  @StateId(stateId)
-  private final StateSpec> spec =
-  StateSpecs.value(StringUtf8Coder.of());
+final ParDoMultiOverrideFactory.GbkThenStatefulParDo
+gbkThenStatefulParDo;
+gbkThenStatefulParDo =
+new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
+new DoFn, Integer>() {
+  @StateId(stateId)
+  private final StateSpec> spec =
+  StateSpecs.value(StringUtf8Coder.of());
+
+  @ProcessElement
+  public void process(ProcessContext c) {}
+},
+mainOutput,
+TupleTagList.empty(),
+Collections.emptyList(),
+DoFnSchemaInformation.create(),
+Collections.emptyMap());
+
+final PCollection>> grouped;
+grouped = gbkThenStatefulParDo.groupToKeyedWorkItem(input);
 
 Review comment:
   ```suggestion
   final PCollection>> grouped =
   gbkThenStatefulParDo.groupToKeyedWorkItem(input);
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356125)
Time Spent: 3h 50m  (was: 3h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=356122&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356122
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 09/Dec/19 12:18
Start Date: 09/Dec/19 12:18
Worklog Time Spent: 10m 
  Work Description: je-ik commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355417745
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
 ##
 @@ -164,7 +164,7 @@ private void fireTimers() {
 transformTimers.getKey(),
 (PCollection)
 Iterables.getOnlyElement(
-
transformTimers.getExecutable().getInputs().values()))
+
transformTimers.getExecutable().getMainInputs().values()))
 
 Review comment:
   This actually fixes a bug. The original code fails when there are side 
inputs, because `getInputs` throws exception.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356122)
Time Spent: 3h 40m  (was: 3.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355793&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355793
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355175226
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
   } else {
-cleanupTimer.setForWindow(value.getValue(), window);
-doFnRunner.processElement(value);
+processElementUnordered(window, value);
+  }
+}
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue value) {
+cleanupTimer.setForWindow(value.getValue(), window);
+doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue value) {
+
+StateInternals stateInternals = stepContext.stateInternals();
+TimerInternals timerInternals = stepContext.timerInternals();
+
+if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+  StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+  BagState> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+  ValueState minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+  sortBuffer.add(value);
+  Instant minStamp =
+  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  if (value.getTimestamp().isBefore(minStamp)) {
+minStamp = value.getTimestamp();
+minStampState.write(minStamp);
+setupFlushTimerAndWatermarkHold(namespace, minStamp);
   }
+} else {
+  reportDroppedElement(value, window);
 }
   }
 
   private boolean isLate(BoundedWindow window) {
 Instant gcTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
-Instant inputWM = cleanupTimer.currentInputWatermarkTime();
+Instant inputWM = stepContext.timerInternals().currentInputWatermarkTime();
 return gcTime.isBefore(inputWM);
   }
 
+  private void reportDroppedElement(WindowedValue value, BoundedWindow 
window) {
+droppedDueToLateness.inc();
+WindowTracing.debug(
+"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
++ "since too far behind inputWatermark:{}",
 
 Review comment:
   Is this message correct in case of processing and dropping from ordered code 
path?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355793)
Time Spent: 3h 10m  (was: 3h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - 

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355788&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355788
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355168688
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
 
 Review comment:
   Can we move this close to the utility producing value with timestamp. So 
have `toByteArrayWithTs` and e.g. `toByteArrayWithoutTs` (any naming is fine)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355788)
Time Spent: 2h 50m  (was: 2h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355794
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355172757
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
 ##
 @@ -656,6 +656,24 @@ public Duration getAllowedTimestampSkew() {
   @Target(ElementType.METHOD)
   public @interface RequiresStableInput {}
 
+  /**
+   * Experimental - no backwards compatibility guarantees. The exact 
name or usage of this
+   * feature may change.
+   *
+   * Annotation that may be added to a {@link ProcessElement} method to 
indicate that the runner
+   * must ensure that the observable contents of the input {@link PCollection} 
is sorted by time, in
+   * ascending order. The time ordering is generally defined by element's 
timestamp, but an
+   * alternative user supplied ordering function can be supplied.
+   *
+   * Note that this annotation makes sense only for stateful {@code 
ParDo}s, because outcome of
+   * stateless functions cannot depend on the ordering.
+   */
+  @Documented
+  @Experimental
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface RequiresTimeSortedInput {}
 
 Review comment:
   I think doc should be enriched with more details and tradeoffs of this 
annotations. Definitely impact on handling late data needs to be mentioned.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355794)
Time Spent: 3h 10m  (was: 3h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355797&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355797
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355173711
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2363,6 +2367,136 @@ public int hashCode() {
 return getClass().hashCode();
   }
 }
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class
+})
+public void testRequiresTimeSortedInput() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithTestStream() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(stamp);
+  }
+  testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithLateData() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder input = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+if (stamp == 100) {
+  // advance watermark when we have 100 remaining elements
+  // all the rest are going to be late elements
+  input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+}
+  }
+  testTimeSortedInput(
 
 Review comment:
   If I am correct, this test does not verify which elements (keys) exactly are 
dropped as late. Can it be made more explicit?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355797)
Time Spent: 3.5h  (was: 3h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355790
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355168771
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+  }
+};
+  }
+
+  private static 
+  PairFlatMapFunction>, TupleTag, 
WindowedValue>
+  wrapDoFnFromSortedRDD(
+  MultiDoFnFunction, OutputT> doFnFunction,
+  Coder keyCoder,
+  Coder> wvCoder) {
+
+return (Iterator> in) -> {
+  Iterator, WindowedValue>>> mappedGroups;
+  mappedGroups =
+  Iterators.transform(
+  splitBySameKey(in, keyCoder, wvCoder),
+  group -> {
+try {
+  return doFnFunction.call(group);
+} catch (Exception ex) {
+  throw new RuntimeException(ex);
+}
+  });
+  return flatten(mappedGroups);
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator flatten(final Iterator> toFlatten) {
+
+return new AbstractIterator() {
+
+  @Nullable Iterator current = null;
+
+  @Override
+  protected T computeNext() {
+while (true) {
+  if (current == null) {
+if (toFlatten.hasNext()) {
+  current = toFlatten.next();
+} else {
+  return endOfData();
+}
+  }
+  if (current.hasNext()) {
+return current.next();
+  }
+  current = null;
+}
+  }
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator>>> splitBySameKey(
+  Iterator> in, Coder keyCoder, 
Coder> wvCoder) {
+
+return new AbstractIterator>>>() {
+
+  @Nullable Tuple2 read = null;
+
+  @Override
+  protected Iterator>> computeNext() {
+readNext();
+if (read != null) {
+  byte[] value = read._1().getValue();
+  byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8);
+  K key = CoderHelpers.fromByteArray(keyPart, keyCoder);
+  return createIteratorForKey(keyPart, key);
+}
+return endOfData();
+  }
+
+  private void readNext() {
+if (read == null) {
+  if (in.hasNext()) {
+read = in.next();
+  }
+}
+  }
+
+  private void consumed() {
+read = null;
+  }
+
+  privat

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355784
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355132678
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 ##
 @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception {
 .apply(Window.into(FixedWindows.of(Duration.millis(10;
 
 TupleTag mainOutput = new TupleTag<>();
-PCollection produced =
-input
-.apply(
-new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
-new DoFn, Integer>() {
-  @StateId(stateId)
-  private final StateSpec> spec =
-  StateSpecs.value(StringUtf8Coder.of());
+final ParDoMultiOverrideFactory.GbkThenStatefulParDo
+gbkThenStatefulParDo;
+gbkThenStatefulParDo =
+new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
+new DoFn, Integer>() {
+  @StateId(stateId)
+  private final StateSpec> spec =
+  StateSpecs.value(StringUtf8Coder.of());
+
+  @ProcessElement
+  public void process(ProcessContext c) {}
+},
+mainOutput,
+TupleTagList.empty(),
+Collections.emptyList(),
+DoFnSchemaInformation.create(),
+Collections.emptyMap());
+
+final PCollection>> grouped;
+grouped = gbkThenStatefulParDo.groupToKeyedWorkItem(input);
 
 Review comment:
   We can do a direct assignment here
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355784)
Time Spent: 2h 20m  (was: 2h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355791&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355791
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355173620
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
 ##
 @@ -2363,6 +2367,136 @@ public int hashCode() {
 return getClass().hashCode();
   }
 }
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class
+})
+public void testRequiresTimeSortedInput() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  testTimeSortedInput(numElements, pipeline.apply(Create.of(eventStamps)));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithTestStream() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(stamp);
+  }
+  testTimeSortedInput(numElements, 
pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithLateData() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder input = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+input = input.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+if (stamp == 100) {
+  // advance watermark when we have 100 remaining elements
+  // all the rest are going to be late elements
+  input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
+}
+  }
+  testTimeSortedInput(
+  numElements - 100,
+  numElements - 1,
+  pipeline.apply(input.advanceWatermarkToInfinity()),
+  // Only direct runner is guaranteed to correctly drop elements after 
watermark  coming
+  // from different (sub-)partitions of TestStream. Cannot reference 
DirectRunner by
+  // class to avoid cyclic dependencies.
+  
pipeline.getOptions().getRunner().getSimpleName().equals("DirectRunner"));
 
 Review comment:
   What does this mean exactly? Other runners does not have same guarantees 
about late data drop and produce different result?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355791)
Time Spent: 3h  (was: 2h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by times

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355785&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355785
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355132294
 
 

 ##
 File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
 ##
 @@ -164,7 +164,7 @@ private void fireTimers() {
 transformTimers.getKey(),
 (PCollection)
 Iterables.getOnlyElement(
-
transformTimers.getExecutable().getInputs().values()))
+
transformTimers.getExecutable().getMainInputs().values()))
 
 Review comment:
   I do not follow what is this change about. Can you elaborate?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355785)
Time Spent: 2.5h  (was: 2h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355787
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355168488
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+  }
+};
+  }
+
+  private static 
+  PairFlatMapFunction>, TupleTag, 
WindowedValue>
+  wrapDoFnFromSortedRDD(
+  MultiDoFnFunction, OutputT> doFnFunction,
+  Coder keyCoder,
+  Coder> wvCoder) {
+
+return (Iterator> in) -> {
+  Iterator, WindowedValue>>> mappedGroups;
+  mappedGroups =
 
 Review comment:
   I wonder why sometimes you prefer:
   ```
   T myVariable;
   myVariable = value();
   ```
   rather that directly
   ```
   T myVariable = value()
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355787)
Time Spent: 2h 40m  (was: 2.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355786&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355786
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355164129
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 ##
 @@ -188,15 +196,35 @@ public void process(ProcessContext c) {}
 // A single bundle with some elements in the global window; it should 
register cleanup for the
 // global window state merely by having the evaluator created. The cleanup 
logic does not
 // depend on the window.
-CommittedBundle> inputBundle =
+CommittedBundle>> inputBundle =
 
 Review comment:
   What is a benefit of expanding KV -> KeyedWorkItem? I do not see connection 
to the requires time sorted input.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355786)
Time Spent: 2h 40m  (was: 2.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355795&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355795
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355175519
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -132,16 +208,56 @@ public void onTimer(
 + "since window is too far behind inputWatermark:{}",
 timestamp,
 window,
-cleanupTimer.currentInputWatermarkTime());
+stepContext.timerInternals().currentInputWatermarkTime());
   } else {
 doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
   }
 }
   }
 
-  @Override
-  public void finishBundle() {
-doFnRunner.finishBundle();
+  // this needs to be optimized (Sorted Map State)
 
 Review comment:
   Is it a performace implication worth to mention to annotation users? 
   
   Can it be link it to a JIRA?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355795)
Time Spent: 3h 20m  (was: 3h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355783&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355783
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355163869
 
 

 ##
 File path: 
runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 ##
 @@ -135,25 +139,29 @@ public void windowCleanupScheduled() throws Exception {
 .apply(Window.into(FixedWindows.of(Duration.millis(10;
 
 TupleTag mainOutput = new TupleTag<>();
-PCollection produced =
-input
-.apply(
-new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
-new DoFn, Integer>() {
-  @StateId(stateId)
-  private final StateSpec> spec =
-  StateSpecs.value(StringUtf8Coder.of());
+final ParDoMultiOverrideFactory.GbkThenStatefulParDo
+gbkThenStatefulParDo;
+gbkThenStatefulParDo =
 
 Review comment:
   We can do a direct assignment here
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355783)
Time Spent: 2h 10m  (was: 2h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355789
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355168726
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
+  .map(
+  input -> {
+final K key = input.getKey();
+Iterable> value = input.getValue();
+return FluentIterable.from(value)
+.transform(
+windowedValue ->
+windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
+.iterator();
+  })
+  .flatMapToPair(doFnFunction);
+}
+
+JavaPairRDD pairRDD =
+kvInRDD
+.map(new ReifyTimestampsAndWindowsFunction<>())
+.mapToPair(TranslationUtils.toPairFunction())
+.mapToPair(
+CoderHelpers.toByteFunctionWithTs(keyCoder, wvCoder, in -> 
in._2().getTimestamp()));
+
+JavaPairRDD sorted =
+
pairRDD.repartitionAndSortWithinPartitions(keyPrefixPartitionerFrom(partitioner));
+
+return sorted.mapPartitionsToPair(wrapDoFnFromSortedRDD(doFnFunction, 
keyCoder, wvCoder));
+  }
+
+  private static Partitioner keyPrefixPartitionerFrom(Partitioner partitioner) 
{
+return new Partitioner() {
+  @Override
+  public int numPartitions() {
+return partitioner.numPartitions();
+  }
+
+  @Override
+  public int getPartition(Object o) {
+ByteArray b = (ByteArray) o;
+return partitioner.getPartition(
+new ByteArray(Arrays.copyOfRange(b.getValue(), 0, 
b.getValue().length - 8)));
+  }
+};
+  }
+
+  private static 
+  PairFlatMapFunction>, TupleTag, 
WindowedValue>
+  wrapDoFnFromSortedRDD(
+  MultiDoFnFunction, OutputT> doFnFunction,
+  Coder keyCoder,
+  Coder> wvCoder) {
+
+return (Iterator> in) -> {
+  Iterator, WindowedValue>>> mappedGroups;
+  mappedGroups =
+  Iterators.transform(
+  splitBySameKey(in, keyCoder, wvCoder),
+  group -> {
+try {
+  return doFnFunction.call(group);
+} catch (Exception ex) {
+  throw new RuntimeException(ex);
+}
+  });
+  return flatten(mappedGroups);
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator flatten(final Iterator> toFlatten) {
+
+return new AbstractIterator() {
+
+  @Nullable Iterator current = null;
+
+  @Override
+  protected T computeNext() {
+while (true) {
+  if (current == null) {
+if (toFlatten.hasNext()) {
+  current = toFlatten.next();
+} else {
+  return endOfData();
+}
+  }
+  if (current.hasNext()) {
+return current.next();
+  }
+  current = null;
+}
+  }
+};
+  }
+
+  @VisibleForTesting
+  static  Iterator>>> splitBySameKey(
+  Iterator> in, Coder keyCoder, 
Coder> wvCoder) {
+
+return new AbstractIterator>>>() {
+
+  @Nullable Tuple2 read = null;
+
+  @Override
+  protected Iterator>> computeNext() {
+readNext();
+if (read != null) {
+  byte[] value = read._1().getValue();
+  byte[] keyPart = Arrays.copyOfRange(value, 0, value.length - 8);
 
 Review comment:
   Same as above. Move this close to `toByteArrayWithTs`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355796&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355796
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355174975
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 ##
 @@ -85,45 +123,83 @@ public void startBundle() {
 doFnRunner.startBundle();
   }
 
+  @Override
+  public void finishBundle() {
+doFnRunner.finishBundle();
+  }
+
   @Override
   public void processElement(WindowedValue input) {
 
 // StatefulDoFnRunner always observes windows, so we need to explode
 for (WindowedValue value : input.explodeWindows()) {
-
   BoundedWindow window = value.getWindows().iterator().next();
-
   if (isLate(window)) {
 // The element is too late for this window.
-droppedDueToLateness.inc();
-WindowTracing.debug(
-"StatefulDoFnRunner.processElement: Dropping element at {}; 
window:{} "
-+ "since too far behind inputWatermark:{}",
-input.getTimestamp(),
-window,
-cleanupTimer.currentInputWatermarkTime());
+reportDroppedElement(value, window);
+  } else if (requiresTimeSortedInput) {
+processElementOrdered(window, value);
   } else {
-cleanupTimer.setForWindow(value.getValue(), window);
-doFnRunner.processElement(value);
+processElementUnordered(window, value);
+  }
+}
+  }
+
+  private void processElementUnordered(BoundedWindow window, 
WindowedValue value) {
+cleanupTimer.setForWindow(value.getValue(), window);
+doFnRunner.processElement(value);
+  }
+
+  private void processElementOrdered(BoundedWindow window, 
WindowedValue value) {
+
+StateInternals stateInternals = stepContext.stateInternals();
+TimerInternals timerInternals = stepContext.timerInternals();
+
+if 
(!timerInternals.currentInputWatermarkTime().isAfter(value.getTimestamp())) {
+  StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+  BagState> sortBuffer = 
stateInternals.state(namespace, sortBufferTag);
+  ValueState minStampState = stateInternals.state(namespace, 
sortBufferMinStampTag);
+  sortBuffer.add(value);
+  Instant minStamp =
+  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  if (value.getTimestamp().isBefore(minStamp)) {
+minStamp = value.getTimestamp();
+minStampState.write(minStamp);
+setupFlushTimerAndWatermarkHold(namespace, minStamp);
   }
+} else {
+  reportDroppedElement(value, window);
 
 Review comment:
   Such internal detail needs to be more explicitly mentioned in javadoc for 
the annotation.
   
   What if I am processing stream and my out of order spread can be quite 
large. Does it make sense for me to use this annotation or better not. Some 
recommendations / hints for users would be great.
   
   This makes me hesitant to use this annotation for stream processing (not 
that I have use case for it right now) unless I can be sure about timing of my 
data stream and watermark move.
   I wonder if it would make sense to give some tradeoffs between latency, 
buffer size, lateness and introduce a possibility to hold watermark back for 
some delta, smaller than max allowed lateness. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355796)
Time Spent: 3.5h  (was: 3h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed latene

[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=355792&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355792
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 08/Dec/19 10:24
Start Date: 08/Dec/19 10:24
Worklog Time Spent: 10m 
  Work Description: JozoVilcek commented on pull request #8774: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#discussion_r355167752
 
 

 ##
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 ##
 @@ -448,18 +458,152 @@ public String toNativeString() {
 JavaRDD>>> groupRDD =
 GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, 
partitioner);
 
-return groupRDD
-.map(
-input -> {
-  final K key = input.getKey();
-  Iterable> value = input.getValue();
-  return FluentIterable.from(value)
-  .transform(
-  windowedValue ->
-  windowedValue.withValue(KV.of(key, 
windowedValue.getValue(
-  .iterator();
-})
-.flatMapToPair(doFnFunction);
+if (!requiresSortedInput) {
+  return groupRDD
 
 Review comment:
   Can we make `groupRDD` local context of this `if-else` branch?
   Also, it would be a bit more intuitive if it is organized as 
`if(requiresSortedInput) { ... } else { ... }`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355792)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338791&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338791
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 14:44
Start Date: 05/Nov/19 14:44
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549852980
 
 
   Run Direct ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338791)
Time Spent: 2h  (was: 1h 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338734&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338734
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 14:16
Start Date: 05/Nov/19 14:16
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549841187
 
 
   Run Flink ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338734)
Time Spent: 1h 20m  (was: 1h 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338735&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338735
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 14:16
Start Date: 05/Nov/19 14:16
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549841255
 
 
   Run Direct ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338735)
Time Spent: 1.5h  (was: 1h 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338736&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338736
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 14:16
Start Date: 05/Nov/19 14:16
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549841386
 
 
   Run Java Flink PortableValidatesRunner Batch
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338736)
Time Spent: 1h 40m  (was: 1.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338737&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338737
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 14:16
Start Date: 05/Nov/19 14:16
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549841324
 
 
   Run Java Flink PortableValidatesRunner Streaming
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338737)
Time Spent: 1h 50m  (was: 1h 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338691
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 12:29
Start Date: 05/Nov/19 12:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549802817
 
 
   Run Direct ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338691)
Time Spent: 50m  (was: 40m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338693
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 12:29
Start Date: 05/Nov/19 12:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549802952
 
 
   Run Java Flink PortableValidatesRunner Batch
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338693)
Time Spent: 1h 10m  (was: 1h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338692
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 12:29
Start Date: 05/Nov/19 12:29
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549802918
 
 
   Run Java Flink PortableValidatesRunner Streaming
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338692)
Time Spent: 1h  (was: 50m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338690&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338690
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 05/Nov/19 12:28
Start Date: 05/Nov/19 12:28
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549802764
 
 
   Run Flink ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338690)
Time Spent: 40m  (was: 0.5h)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338122&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338122
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 04/Nov/19 14:20
Start Date: 04/Nov/19 14:20
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549375080
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338122)
Time Spent: 0.5h  (was: 20m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338121&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338121
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 04/Nov/19 14:20
Start Date: 04/Nov/19 14:20
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549375013
 
 
   Run Java PreCommi
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338121)
Time Spent: 20m  (was: 10m)

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8550) @RequiresTimeSortedInput DoFn annotation

2019-11-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8550?focusedWorklogId=338109&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338109
 ]

ASF GitHub Bot logged work on BEAM-8550:


Author: ASF GitHub Bot
Created on: 04/Nov/19 13:53
Start Date: 04/Nov/19 13:53
Worklog Time Spent: 10m 
  Work Description: je-ik commented on issue #8774: Proposal: [BEAM-8550] 
Requires time sorted input
URL: https://github.com/apache/beam/pull/8774#issuecomment-549362888
 
 
   Run Java_Examples_Dataflow PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 338109)
Remaining Estimate: 0h
Time Spent: 10m

> @RequiresTimeSortedInput DoFn annotation
> 
>
> Key: BEAM-8550
> URL: https://issues.apache.org/jira/browse/BEAM-8550
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Jan Lukavský
>Assignee: Jan Lukavský
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Implement new annotation {{@RequiresTimeSortedInput}} for stateful DoFn as 
> described in [design 
> document|https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing].
>  First implementation will assume that:
>   - time is defined by timestamp in associated WindowedValue
>   - allowed lateness is explicitly zero and all late elements are dropped 
> (due to being out of order)
> The above properties are considered temporary and will be resolved by 
> subsequent extensions (backwards compatible).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)