[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-05-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=103493=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103493
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 18/May/18 17:35
Start Date: 18/May/18 17:35
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 987291c261a..37101ca90f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1450,6 +1450,13 @@
 test
   
 
+  
+org.slf4j
+slf4j-simple
+${slf4j.version}
+test
+  
+
   
 org.mockito
 mockito-core
diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle
index 11d606372ea..3d77c40c103 100644
--- a/runners/core-java/build.gradle
+++ b/runners/core-java/build.gradle
@@ -46,6 +46,6 @@ dependencies {
   shadowTest library.java.hamcrest_core
   shadowTest library.java.mockito_core
   shadowTest library.java.junit
-  shadowTest library.java.slf4j_jdk14
+  shadowTest library.java.slf4j_simple
   shadowTest library.java.jackson_dataformat_yaml
 }
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index f5215ca3239..6e0aa03d259 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -100,6 +100,11 @@
   guava
 
 
+
+  org.slf4j
+  slf4j-api
+
+
 
   joda-time
   joda-time
@@ -141,7 +146,7 @@
 
 
   org.slf4j
-  slf4j-jdk14
+  slf4j-simple
   test
 
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 8285c7245d2..78fc60e8ea4 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -29,9 +27,7 @@
 import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
 
 /**
  * Helpers for merging state.
@@ -212,98 +208,7 @@
 result.addAccum(merged);
   }
 
-  /**
-   * Prefetch all watermark state for {@code address} across all merging 
windows in
-   * {@code context}.
-   */
-  public static  void prefetchWatermarks(
-  MergingStateAccessor context,
-  StateTag address) {
-Map map = 
context.accessInEachMergingWindow(address);
-WatermarkHoldState result = context.access(address);
-if (map.isEmpty()) {
-  // Nothing to prefetch.
-  return;
-}
-if (map.size() == 1 && map.values().contains(result)
-&& result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
-  // Nothing to change.
-  return;
-}
-if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
-  // No need to read existing holds.
-  return;
-}
-// Prefetch.
-for (WatermarkHoldState source : map.values()) {
-  prefetchRead(source);
-}
-  }
-
   private static void prefetchRead(ReadableState source) {
 source.readLater();
   }
-
-  /**
-   * Merge all watermark state in {@code address} across all merging windows 
in {@code context},
-   * where the final merge result window is {@code mergeResult}.
-   */
-  public static  void mergeWatermarks(
-  MergingStateAccessor context,
-  StateTag address,
-  W mergeResult) {
-mergeWatermarks(
-context.accessInEachMergingWindow(address).values(), 
context.access(address), mergeResult);
-  }
-
-  /**
-   * Merge all watermark state in {@code sources} (which must include {@code 
result} if non-empty)
-   * into {@code result}, where the final merge result window is {@code 
mergeResult}.
-   */
-  public static  void mergeWatermarks(
-  Collection sources, WatermarkHoldState result,
-  W resultWindow) {
-if (sources.isEmpty()) {
-  // Nothing to merge.
-  return;
-}
-if (sources.size() == 1 && sources.contains(result)
-

[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-05-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=103101=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103101
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 17/May/18 20:38
Start Date: 17/May/18 20:38
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on issue #4793: [BEAM-3776] Fix 
issue with merging late windows where a watermark hold could be added behind 
the input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-390003249
 
 
   @tgroh PTAL, now passing


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 103101)
Time Spent: 5h 50m  (was: 5h 40m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-05-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=102212=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102212
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 15/May/18 17:21
Start Date: 15/May/18 17:21
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on issue #4793: [BEAM-3776] Fix 
issue with merging late windows where a watermark hold could be added behind 
the input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-389246274
 
 
I am back from leave and will rebase and make sure tests pass.
   
   
   On Tue, Apr 17, 2018 at 9:09 AM Thomas Groh 
   wrote:
   
   > Additionally, running flink:testCompileJava, I get:
   >
   > > Task :runners:flink:compileTestJava FAILED
   > 
/usr/local/google/home/tgroh/git/beam/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java:115:
 error: method does not override or implement a method from a supertype
   >   @Override
   >   ^
   > 
/usr/local/google/home/tgroh/git/beam/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java:119:
 error: method does not override or implement a method from a supertype
   >   @Override
   >   ^
   > 
/usr/local/google/home/tgroh/git/beam/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java:87:
 error: method does not override or implement a method from a supertype
   >   @Override
   >   ^
   > 
/usr/local/google/home/tgroh/git/beam/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java:91:
 error: method does not override or implement a method from a supertype
   >   @Override
   >   ^
   > 
/usr/local/google/home/tgroh/git/beam/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java:133:
 error: method does not override or implement a method from a supertype
   > @Override
   > ^
   > 
/usr/local/google/home/tgroh/git/beam/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkKeyGroupStateInternalsTest.java:137:
 error: method does not override or implement a method from a supertype
   > @Override
   > ^
   > Note: Some input files use unchecked or unsafe operations.
   > Note: Recompile with -Xlint:unchecked for details.
   > 6 errors
   >
   > —
   > You are receiving this because you authored the thread.
   > Reply to this email directly, view it on GitHub
   > , or mute
   > the thread
   > 

   > .
   >
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 102212)
Time Spent: 5h 40m  (was: 5.5h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-04-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=91795=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91795
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 17/Apr/18 16:09
Start Date: 17/Apr/18 16:09
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-382050089
 
 
   Sorry to come back so late after with requests - can you rebase on top of 
master? The precommit that seems to be running seems to be looking for targets 
that don't exist on your revision, if I read some of the issues properly


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 91795)
Time Spent: 5h 20m  (was: 5h 10m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-04-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=90922=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90922
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 13/Apr/18 18:20
Start Date: 13/Apr/18 18:20
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-381220818
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90922)
Time Spent: 5h 10m  (was: 5h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-04-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=90629=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90629
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 12/Apr/18 22:59
Start Date: 12/Apr/18 22:59
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-380969648
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 90629)
Time Spent: 5h  (was: 4h 50m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=85302=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85302
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:49
Start Date: 28/Mar/18 15:49
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-376936050
 
 
   Precommit failures?
   
   Ho hum nevermind these look like quota issues or unrelated


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85302)
Time Spent: 4.5h  (was: 4h 20m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=85303=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85303
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:49
Start Date: 28/Mar/18 15:49
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-376936457
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85303)
Time Spent: 4h 40m  (was: 4.5h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=85304=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85304
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:49
Start Date: 28/Mar/18 15:49
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-376936457
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85304)
Time Spent: 4h 50m  (was: 4h 40m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-28 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=85301=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85301
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 28/Mar/18 15:47
Start Date: 28/Mar/18 15:47
Worklog Time Spent: 10m 
  Work Description: tgroh commented on issue #4793: [BEAM-3776] Fix issue 
with merging late windows where a watermark hold could be added behind the 
input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-376936050
 
 
   Precommit failures?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 85301)
Time Spent: 4h 20m  (was: 4h 10m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=83846=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83846
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 23/Mar/18 22:27
Start Date: 23/Mar/18 22:27
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r176878673
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 ##
 @@ -533,28 +534,33 @@ public void advanceSynchronizedProcessingTime(
*/
   @SafeVarargs
   public final void injectElements(TimestampedValue... values) throws 
Exception {
+injectElements(Arrays.asList(values));
+  }
+
+  public final void injectElements(Iterable values) 
throws Exception {
 for (TimestampedValue value : values) {
   WindowTracing.trace("TriggerTester.injectElements: {}", value);
 }
 
 Iterable inputs =
-Arrays.asList(values)
-.stream()
-.map(
-input -> {
-  try {
-InputT value = input.getValue();
-Instant timestamp = input.getTimestamp();
-Collection windows =
-windowFn.assignWindows(
-new TestAssignContext(
-windowFn, value, timestamp, 
GlobalWindow.INSTANCE));
-return WindowedValue.of(value, timestamp, windows, 
PaneInfo.NO_FIRING);
-  } catch (Exception e) {
-throw new RuntimeException(e);
-  }
-})
-.collect(Collectors.toList());
+Iterables.transform(
 
 Review comment:
   Reworked to keep streams without needing StreamSupport


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83846)
Time Spent: 4h  (was: 3h 50m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=83847=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83847
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 23/Mar/18 22:27
Start Date: 23/Mar/18 22:27
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on issue #4793: [BEAM-3776] Fix 
issue with merging late windows where a watermark hold could be added behind 
the input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-375814669
 
 
   @tgroh Can you PTAL? Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83847)
Time Spent: 4h 10m  (was: 4h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=83844=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83844
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 23/Mar/18 22:25
Start Date: 23/Mar/18 22:25
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r176878350
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +911,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  // Performs the specified actions and verifies that the watermark hold is 
set correctly.
+  public void mergingWatermarkHoldTestHelper(List configuration) 
throws Exception {
+LOG.info("Running config {}",  configuration);
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
+// Test handling of late data. Specifically, ensure the watermark hold is 
correct.
+Duration allowedLateness = Duration.standardMinutes(1);
+Duration gapDuration = Duration.millis(10);
+LOG.info("Gap duration {}", gapDuration);
+ReduceFnTester tester =
+ReduceFnTester.nonCombining(
+WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
+.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+.withTrigger(
+Repeatedly.forever(
+AfterWatermark.pastEndOfWindow()
+
.withLateFirings(AfterPane.elementCountAtLeast(1
+.withAllowedLateness(allowedLateness));
+tester.setAutoAdvanceOutputWatermark(false);
+
+// Input watermark -> null
+assertEquals(null, tester.getWatermarkHold());
+assertEquals(null, tester.getOutputWatermark());
+
+int maxTs = 0;
+long watermark = 0;
+for (Action action : configuration) {
+  if (action.times != null) {
+LOG.info("Injecting {}", action.times);
+injectElements(tester, action.times);
+int maxLocalTs = Ordering.natural().max(action.times);
+if (maxLocalTs > maxTs) {
+  maxTs = maxLocalTs;
+}
+  }
+  if (action.inputWatermark > watermark) {
+watermark = action.inputWatermark;
+LOG.info("Advancing watermark to {}", new Instant(watermark));
+tester.advanceInputWatermark(new Instant(watermark));
+  }
+  Instant hold = tester.getWatermarkHold();
+  if (hold != null) {
+assertThat(hold, greaterThanOrEqualTo(new Instant(watermark)));
+assertThat(watermark, lessThan(maxTs + gapDuration.getMillis()));
+  }
+}
+if (gapDuration.getMillis() + maxTs > watermark) {
+  watermark = gapDuration.getMillis() + maxTs;
+  tester.advanceInputWatermark(new Instant(watermark));
+}
+LOG.info("Output {}", tester.extractOutput());
+assertThat(tester.getWatermarkHold(), nullValue());
+tester.advanceInputWatermark(new Instant(watermark).plus(allowedLateness));
+assertThat(tester.getWatermarkHold(), nullValue());
+
+// Nothing dropped.
+long droppedElements =
+container
+.getCounter(
+MetricName.named(ReduceFnRunner.class,
+ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+.getCumulative()
+.longValue();
+assertEquals(0, droppedElements);
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindow() throws Exception {
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+mergingWatermarkHoldTestHelper(actions);
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindowExtended() throws Exception 
{
+LinkedList actions = new LinkedList<>();
+  actions.add(Action.inputWatermark(40));
+  actions.add(Action.times(1));
+  actions.add(Action.times(10));
+mergingWatermarkHoldTestHelper(actions);
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindowMerged() throws Exception {
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+actions.add(Action.times(14));
+actions.add(Action.times(6));
+mergingWatermarkHoldTestHelper(actions);
+  }
+
+  @Test
+  public void 
testMergingWatermarkHoldLateNewWindowExtendedPastInputWatermark() throws 
Exception {
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+

[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=83842=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83842
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 23/Mar/18 22:24
Start Date: 23/Mar/18 22:24
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r176878187
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +911,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  // Performs the specified actions and verifies that the watermark hold is 
set correctly.
+  public void mergingWatermarkHoldTestHelper(List configuration) 
throws Exception {
+LOG.info("Running config {}",  configuration);
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
+// Test handling of late data. Specifically, ensure the watermark hold is 
correct.
+Duration allowedLateness = Duration.standardMinutes(1);
+Duration gapDuration = Duration.millis(10);
+LOG.info("Gap duration {}", gapDuration);
+ReduceFnTester tester =
+ReduceFnTester.nonCombining(
+WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
+.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+.withTrigger(
+Repeatedly.forever(
+AfterWatermark.pastEndOfWindow()
+
.withLateFirings(AfterPane.elementCountAtLeast(1
+.withAllowedLateness(allowedLateness));
+tester.setAutoAdvanceOutputWatermark(false);
+
+// Input watermark -> null
+assertEquals(null, tester.getWatermarkHold());
+assertEquals(null, tester.getOutputWatermark());
+
+int maxTs = 0;
+long watermark = 0;
+for (Action action : configuration) {
+  if (action.times != null) {
+LOG.info("Injecting {}", action.times);
+injectElements(tester, action.times);
+int maxLocalTs = Ordering.natural().max(action.times);
+if (maxLocalTs > maxTs) {
+  maxTs = maxLocalTs;
+}
+  }
+  if (action.inputWatermark > watermark) {
+watermark = action.inputWatermark;
+LOG.info("Advancing watermark to {}", new Instant(watermark));
+tester.advanceInputWatermark(new Instant(watermark));
+  }
+  Instant hold = tester.getWatermarkHold();
+  if (hold != null) {
+assertThat(hold, greaterThanOrEqualTo(new Instant(watermark)));
+assertThat(watermark, lessThan(maxTs + gapDuration.getMillis()));
+  }
+}
+if (gapDuration.getMillis() + maxTs > watermark) {
+  watermark = gapDuration.getMillis() + maxTs;
+  tester.advanceInputWatermark(new Instant(watermark));
+}
+LOG.info("Output {}", tester.extractOutput());
+assertThat(tester.getWatermarkHold(), nullValue());
+tester.advanceInputWatermark(new Instant(watermark).plus(allowedLateness));
+assertThat(tester.getWatermarkHold(), nullValue());
+
+// Nothing dropped.
+long droppedElements =
+container
+.getCounter(
+MetricName.named(ReduceFnRunner.class,
+ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+.getCumulative()
+.longValue();
+assertEquals(0, droppedElements);
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindow() throws Exception {
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+mergingWatermarkHoldTestHelper(actions);
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83842)
Time Spent: 3.5h  (was: 3h 20m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 

[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=82067=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82067
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:15
Start Date: 19/Mar/18 22:15
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r175587347
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 ##
 @@ -533,28 +534,33 @@ public void advanceSynchronizedProcessingTime(
*/
   @SafeVarargs
   public final void injectElements(TimestampedValue... values) throws 
Exception {
+injectElements(Arrays.asList(values));
+  }
+
+  public final void injectElements(Iterable values) 
throws Exception {
 for (TimestampedValue value : values) {
   WindowTracing.trace("TriggerTester.injectElements: {}", value);
 }
 
 Iterable inputs =
-Arrays.asList(values)
-.stream()
-.map(
-input -> {
-  try {
-InputT value = input.getValue();
-Instant timestamp = input.getTimestamp();
-Collection windows =
-windowFn.assignWindows(
-new TestAssignContext(
-windowFn, value, timestamp, 
GlobalWindow.INSTANCE));
-return WindowedValue.of(value, timestamp, windows, 
PaneInfo.NO_FIRING);
-  } catch (Exception e) {
-throw new RuntimeException(e);
-  }
-})
-.collect(Collectors.toList());
+Iterables.transform(
 
 Review comment:
   this seems like the wrong direction for this change - certainly the 
lambda-to-function.
   
   You can use `StreamSupport` to keep using streams


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82067)
Time Spent: 3h 10m  (was: 3h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=82065=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82065
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:15
Start Date: 19/Mar/18 22:15
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r175581212
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -110,6 +119,24 @@
 return Mockito.any();
   }
 
+  static class Action {
+
+static Action times(Integer... times) {
+  Action a = new Action();
+  a.times = Arrays.asList(times);
+  return a;
+}
+
+static Action inputWatermark(long watermark) {
+  Action a = new Action();
+  a.inputWatermark = watermark;
+  return a;
+}
+
+List times;
+long inputWatermark = 0;
 
 Review comment:
   `Instant`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82065)
Time Spent: 3h  (was: 2h 50m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=82063=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82063
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:15
Start Date: 19/Mar/18 22:15
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r175585280
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +911,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  // Performs the specified actions and verifies that the watermark hold is 
set correctly.
+  public void mergingWatermarkHoldTestHelper(List configuration) 
throws Exception {
+LOG.info("Running config {}",  configuration);
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
+// Test handling of late data. Specifically, ensure the watermark hold is 
correct.
+Duration allowedLateness = Duration.standardMinutes(1);
+Duration gapDuration = Duration.millis(10);
+LOG.info("Gap duration {}", gapDuration);
+ReduceFnTester tester =
+ReduceFnTester.nonCombining(
+WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
+.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+.withTrigger(
+Repeatedly.forever(
+AfterWatermark.pastEndOfWindow()
+
.withLateFirings(AfterPane.elementCountAtLeast(1
+.withAllowedLateness(allowedLateness));
+tester.setAutoAdvanceOutputWatermark(false);
+
+// Input watermark -> null
+assertEquals(null, tester.getWatermarkHold());
+assertEquals(null, tester.getOutputWatermark());
+
+int maxTs = 0;
+long watermark = 0;
+for (Action action : configuration) {
+  if (action.times != null) {
+LOG.info("Injecting {}", action.times);
+injectElements(tester, action.times);
+int maxLocalTs = Ordering.natural().max(action.times);
+if (maxLocalTs > maxTs) {
+  maxTs = maxLocalTs;
+}
+  }
+  if (action.inputWatermark > watermark) {
+watermark = action.inputWatermark;
+LOG.info("Advancing watermark to {}", new Instant(watermark));
+tester.advanceInputWatermark(new Instant(watermark));
+  }
+  Instant hold = tester.getWatermarkHold();
+  if (hold != null) {
+assertThat(hold, greaterThanOrEqualTo(new Instant(watermark)));
+assertThat(watermark, lessThan(maxTs + gapDuration.getMillis()));
+  }
+}
+if (gapDuration.getMillis() + maxTs > watermark) {
+  watermark = gapDuration.getMillis() + maxTs;
+  tester.advanceInputWatermark(new Instant(watermark));
+}
+LOG.info("Output {}", tester.extractOutput());
+assertThat(tester.getWatermarkHold(), nullValue());
+tester.advanceInputWatermark(new Instant(watermark).plus(allowedLateness));
+assertThat(tester.getWatermarkHold(), nullValue());
+
+// Nothing dropped.
+long droppedElements =
+container
+.getCounter(
+MetricName.named(ReduceFnRunner.class,
+ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+.getCumulative()
+.longValue();
+assertEquals(0, droppedElements);
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindow() throws Exception {
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+mergingWatermarkHoldTestHelper(actions);
 
 Review comment:
   I'd love to separate out some of the assertions about what actually happened.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82063)
Time Spent: 2h 50m  (was: 2h 40m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue 

[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=82064=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82064
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:15
Start Date: 19/Mar/18 22:15
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r175586984
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +911,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  // Performs the specified actions and verifies that the watermark hold is 
set correctly.
+  public void mergingWatermarkHoldTestHelper(List configuration) 
throws Exception {
+LOG.info("Running config {}",  configuration);
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
+// Test handling of late data. Specifically, ensure the watermark hold is 
correct.
+Duration allowedLateness = Duration.standardMinutes(1);
+Duration gapDuration = Duration.millis(10);
+LOG.info("Gap duration {}", gapDuration);
+ReduceFnTester tester =
+ReduceFnTester.nonCombining(
+WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
+.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+.withTrigger(
+Repeatedly.forever(
+AfterWatermark.pastEndOfWindow()
+
.withLateFirings(AfterPane.elementCountAtLeast(1
+.withAllowedLateness(allowedLateness));
+tester.setAutoAdvanceOutputWatermark(false);
+
+// Input watermark -> null
+assertEquals(null, tester.getWatermarkHold());
+assertEquals(null, tester.getOutputWatermark());
+
+int maxTs = 0;
+long watermark = 0;
+for (Action action : configuration) {
+  if (action.times != null) {
+LOG.info("Injecting {}", action.times);
+injectElements(tester, action.times);
+int maxLocalTs = Ordering.natural().max(action.times);
+if (maxLocalTs > maxTs) {
+  maxTs = maxLocalTs;
+}
+  }
+  if (action.inputWatermark > watermark) {
+watermark = action.inputWatermark;
+LOG.info("Advancing watermark to {}", new Instant(watermark));
+tester.advanceInputWatermark(new Instant(watermark));
+  }
+  Instant hold = tester.getWatermarkHold();
+  if (hold != null) {
+assertThat(hold, greaterThanOrEqualTo(new Instant(watermark)));
+assertThat(watermark, lessThan(maxTs + gapDuration.getMillis()));
+  }
+}
+if (gapDuration.getMillis() + maxTs > watermark) {
+  watermark = gapDuration.getMillis() + maxTs;
+  tester.advanceInputWatermark(new Instant(watermark));
+}
+LOG.info("Output {}", tester.extractOutput());
+assertThat(tester.getWatermarkHold(), nullValue());
+tester.advanceInputWatermark(new Instant(watermark).plus(allowedLateness));
+assertThat(tester.getWatermarkHold(), nullValue());
+
+// Nothing dropped.
+long droppedElements =
+container
+.getCounter(
+MetricName.named(ReduceFnRunner.class,
+ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
+.getCumulative()
+.longValue();
+assertEquals(0, droppedElements);
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindow() throws Exception {
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+mergingWatermarkHoldTestHelper(actions);
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindowExtended() throws Exception 
{
+LinkedList actions = new LinkedList<>();
+  actions.add(Action.inputWatermark(40));
+  actions.add(Action.times(1));
+  actions.add(Action.times(10));
+mergingWatermarkHoldTestHelper(actions);
+  }
+
+  @Test
+  public void testMergingWatermarkHoldLateNewWindowMerged() throws Exception {
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+actions.add(Action.times(14));
+actions.add(Action.times(6));
+mergingWatermarkHoldTestHelper(actions);
+  }
+
+  @Test
+  public void 
testMergingWatermarkHoldLateNewWindowExtendedPastInputWatermark() throws 
Exception {
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+

[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=82066=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82066
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 22:15
Start Date: 19/Mar/18 22:15
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r175579336
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -110,6 +119,24 @@
 return Mockito.any();
   }
 
+  static class Action {
 
 Review comment:
   `AutoValue` with `@Nullable` fields, and a comment about what the field 
means in terms of the action that should be taken (I don't, for example, really 
understand what `times` means - multiply something? add an element with a 
timestamp? add something x times?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 82066)
Time Spent: 3h  (was: 2h 50m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=81995=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81995
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 19:58
Start Date: 19/Mar/18 19:58
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r175565468
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +907,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  @Test
+  public void testMergingWatermarkHoldAndLateDataSpecific() throws Exception {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81995)
Time Spent: 2h 40m  (was: 2.5h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=81967=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81967
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 18:42
Start Date: 19/Mar/18 18:42
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on issue #4793: [BEAM-3776] Fix 
issue with merging late windows where a watermark hold could be added behind 
the input watermark.
URL: https://github.com/apache/beam/pull/4793#issuecomment-374321997
 
 
   @tgroh Can you have an additional look? Also, merge it in later? Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81967)
Time Spent: 2.5h  (was: 2h 20m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=81965=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81965
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 18:41
Start Date: 19/Mar/18 18:41
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r175543476
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +907,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  @Test
+  public void testMergingWatermarkHoldAndLateDataSpecific() throws Exception {
 
 Review comment:
   I like the first option better. In case a test fails, we will know which 
exact config failed and it will be easier to fix. I saw a post (TotT) about 
having loops in tests is generally a bad idea.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81965)
Time Spent: 2h 10m  (was: 2h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=81966=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81966
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 19/Mar/18 18:41
Start Date: 19/Mar/18 18:41
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r175544048
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 ##
 @@ -247,8 +249,26 @@ private Instant addGarbageCollectionHold(
   /**
* Prefetch watermark holds in preparation for merging.
*/
-  public void prefetchOnMerge(MergingStateAccessor state) {
-StateMerging.prefetchWatermarks(state, elementHoldTag);
+  public void prefetchOnMerge(MergingStateAccessor context) {
 
 Review comment:
   As I am not familiar with the StateMerging, I will leave it up to the 
committer who will merge this PR in.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81966)
Time Spent: 2h 20m  (was: 2h 10m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=80100=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80100
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 13/Mar/18 22:32
Start Date: 13/Mar/18 22:32
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r174305824
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +907,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  @Test
+  public void testMergingWatermarkHoldAndLateDataSpecific() throws Exception {
 
 Review comment:
   Would you prefer:
   - a test helper function taking configuration objects with separate tests 
for each configuration
   - remove most of these and just keep a complicated one
   - making a lot of separate tests but removing configuration object and just 
duplicating test setup
   
   These were written at the same time as I detected this issue but are 
unrelated. They seem like useful additional coverage but I could also put them 
into a separate change if you'd prefer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80100)
Time Spent: 2h  (was: 1h 50m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=80099=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80099
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 13/Mar/18 22:31
Start Date: 13/Mar/18 22:31
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r174305640
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +907,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  @Test
+  public void testMergingWatermarkHoldAndLateDataSpecific() throws Exception {
+LinkedList configurations = new LinkedList<>();
+
+// Simple: late new window
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+configurations.add(actions);
+
+// Simple: late new window, closed and extended.
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+actions.add(Action.times(10));
+configurations.add(actions);
+
+// Simple: late new window, closed and merged
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+actions.add(Action.times(14));
+actions.add(Action.times(6));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark, extend more
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+actions.add(Action.times(43));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark, extend more
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+actions.add(Action.times(11));
+configurations.add(actions);
+
+// Simple: new window closes, then extended
+actions = new LinkedList<>();
+actions.add(Action.times(11));
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(18));
+configurations.add(actions);
+
+// Merging: new window closes, then extended then merged with new window
+actions = new LinkedList<>();
+actions.add(Action.times(11));
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(18));
+actions.add(Action.times(41));
+actions.add(Action.times(27, 33));
+configurations.add(actions);
+
+// Merging: late window, merges with new window
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29));
+actions.add(Action.times(41));
+configurations.add(actions);
+
+// Merging: late window, new window joined
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29));
+actions.add(Action.times(45));
+actions.add(Action.times(36));
+configurations.add(actions);
+
+// Merging: late window, new window all at once
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29, 45, 36));
+configurations.add(actions);
+
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(42));
+actions.add(Action.times(33));
+configurations.add(actions);
+
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(42));
+actions.add(Action.times(33, 21));
+actions.add(Action.inputWatermark(50));
+actions.add(Action.times(12));
+configurations.add(actions);
+
+for (LinkedList configuration : configurations) {
+  System.out.println("Running config " + configuration.toString());
 
 Review comment:
   Replaced with LOG


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-13 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=80098=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80098
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 13/Mar/18 22:28
Start Date: 13/Mar/18 22:28
Worklog Time Spent: 10m 
  Work Description: scwhittle commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r174305062
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 ##
 @@ -247,8 +249,26 @@ private Instant addGarbageCollectionHold(
   /**
* Prefetch watermark holds in preparation for merging.
*/
-  public void prefetchOnMerge(MergingStateAccessor state) {
-StateMerging.prefetchWatermarks(state, elementHoldTag);
+  public void prefetchOnMerge(MergingStateAccessor context) {
 
 Review comment:
   I moved the merging code out of StateMerging.java so that I could reuse 
WatermarkHold.addElementHold instead of passing in the watermarks to 
StateMerging and duplicating the late detection logic.  To keep everything 
located together I moved the other uses of WatermarkHoldState from StateMerging 
to WatermarkHold.  AFAIK the StateMerging class is a leftover from lazy state 
merging and doesn't provide much benefit any more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80098)
Time Spent: 1h 40m  (was: 1.5h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=79571=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79571
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 12/Mar/18 18:46
Start Date: 12/Mar/18 18:46
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r173895435
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +907,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  @Test
+  public void testMergingWatermarkHoldAndLateDataSpecific() throws Exception {
 
 Review comment:
   This test should be simpler and shorter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79571)
Time Spent: 1h 20m  (was: 1h 10m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=79572=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79572
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 12/Mar/18 18:46
Start Date: 12/Mar/18 18:46
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r173896102
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +907,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  @Test
+  public void testMergingWatermarkHoldAndLateDataSpecific() throws Exception {
+LinkedList configurations = new LinkedList<>();
+
+// Simple: late new window
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+configurations.add(actions);
+
+// Simple: late new window, closed and extended.
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+actions.add(Action.times(10));
+configurations.add(actions);
+
+// Simple: late new window, closed and merged
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+actions.add(Action.times(14));
+actions.add(Action.times(6));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark, extend more
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+actions.add(Action.times(43));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark, extend more
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+actions.add(Action.times(11));
+configurations.add(actions);
+
+// Simple: new window closes, then extended
+actions = new LinkedList<>();
+actions.add(Action.times(11));
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(18));
+configurations.add(actions);
+
+// Merging: new window closes, then extended then merged with new window
+actions = new LinkedList<>();
+actions.add(Action.times(11));
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(18));
+actions.add(Action.times(41));
+actions.add(Action.times(27, 33));
+configurations.add(actions);
+
+// Merging: late window, merges with new window
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29));
+actions.add(Action.times(41));
+configurations.add(actions);
+
+// Merging: late window, new window joined
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29));
+actions.add(Action.times(45));
+actions.add(Action.times(36));
+configurations.add(actions);
+
+// Merging: late window, new window all at once
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29, 45, 36));
+configurations.add(actions);
+
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(42));
+actions.add(Action.times(33));
+configurations.add(actions);
+
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(42));
+actions.add(Action.times(33, 21));
+actions.add(Action.inputWatermark(50));
+actions.add(Action.times(12));
+configurations.add(actions);
+
+for (LinkedList configuration : configurations) {
+  System.out.println("Running config " + configuration.toString());
+  MetricsContainerImpl container = new MetricsContainerImpl("any");
+  MetricsEnvironment.setCurrentContainer(container);
+  // Test handling of late data. Specifically, ensure the watermark hold 
is correct.
+  Duration allowedLateness = Duration.standardMinutes(1);
+  Duration gapDuration = Duration.millis(10);
+  System.out.printf("Gap duration %s\n", gapDuration);
+  

[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=79570=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79570
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 12/Mar/18 18:46
Start Date: 12/Mar/18 18:46
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r173894355
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 ##
 @@ -247,8 +249,26 @@ private Instant addGarbageCollectionHold(
   /**
* Prefetch watermark holds in preparation for merging.
*/
-  public void prefetchOnMerge(MergingStateAccessor state) {
-StateMerging.prefetchWatermarks(state, elementHoldTag);
+  public void prefetchOnMerge(MergingStateAccessor context) {
 
 Review comment:
   Is there any specific reason why code is moved out from StateMerging.java?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79570)
Time Spent: 1h 10m  (was: 1h)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=79555=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79555
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 12/Mar/18 18:06
Start Date: 12/Mar/18 18:06
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r173894122
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 ##
 @@ -247,8 +249,26 @@ private Instant addGarbageCollectionHold(
   /**
* Prefetch watermark holds in preparation for merging.
*/
-  public void prefetchOnMerge(MergingStateAccessor state) {
-StateMerging.prefetchWatermarks(state, elementHoldTag);
+  public void prefetchOnMerge(MergingStateAccessor context) {
+Map map = 
context.accessInEachMergingWindow(elementHoldTag);
 
 Review comment:
   Is there any specific reason why code is moved out from StateMerging.java?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79555)
Time Spent: 1h  (was: 50m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=79554=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79554
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 12/Mar/18 18:05
Start Date: 12/Mar/18 18:05
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r173894122
 
 

 ##
 File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 ##
 @@ -247,8 +249,26 @@ private Instant addGarbageCollectionHold(
   /**
* Prefetch watermark holds in preparation for merging.
*/
-  public void prefetchOnMerge(MergingStateAccessor state) {
-StateMerging.prefetchWatermarks(state, elementHoldTag);
+  public void prefetchOnMerge(MergingStateAccessor context) {
+Map map = 
context.accessInEachMergingWindow(elementHoldTag);
 
 Review comment:
   Is there any specific reason why code is moved out from StateMerging.java?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79554)
Time Spent: 50m  (was: 40m)

> StateMerging.mergeWatermarks sets a late watermark hold for late merging 
> windows that depend only on the window
> ---
>
> Key: BEAM-3776
> URL: https://issues.apache.org/jira/browse/BEAM-3776
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Affects Versions: 2.1.0, 2.2.0, 2.3.0
>Reporter: Sam Whittle
>Assignee: Sam Whittle
>Priority: Critical
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> WatermarkHold.addElementHold and WatermarkHold.addGarbageCollectionHold take 
> to not add holds that would be before the input watermark.
> However WatermarkHold.onMerge calls StateMerging.mergeWatermarks which if the 
> window depends only on window, sets a hold for the end of the window 
> regardless of the input watermark.
> Thus if you have a WindowingStrategy such as:
> WindowingStrategy.of(Sessions.withGapDuration(gapDuration))
>  .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
>  .withTrigger(
>  Repeatedly.forever(
>  AfterWatermark.pastEndOfWindow()
>  .withLateFirings(AfterPane.elementCountAtLeast(10
>  .withAllowedLateness(allowedLateness))
> and you merge windows that are late, you might end up holding the watermark 
> until the allowedLateness has passed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3776) StateMerging.mergeWatermarks sets a late watermark hold for late merging windows that depend only on the window

2018-03-12 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3776?focusedWorklogId=79553=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79553
 ]

ASF GitHub Bot logged work on BEAM-3776:


Author: ASF GitHub Bot
Created on: 12/Mar/18 18:05
Start Date: 12/Mar/18 18:05
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on a change in pull request #4793: 
[BEAM-3776] Fix issue with merging late windows where a watermark hold could be 
added behind the input watermark.
URL: https://github.com/apache/beam/pull/4793#discussion_r173888558
 
 

 ##
 File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 ##
 @@ -873,6 +907,288 @@ public void testWatermarkHoldAndLateData() throws 
Exception {
 tester.assertHasOnlyGlobalAndFinishedSetsFor();
   }
 
+  @Test
+  public void testMergingWatermarkHoldAndLateDataSpecific() throws Exception {
+LinkedList configurations = new LinkedList<>();
+
+// Simple: late new window
+LinkedList actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+configurations.add(actions);
+
+// Simple: late new window, closed and extended.
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+actions.add(Action.times(10));
+configurations.add(actions);
+
+// Simple: late new window, closed and merged
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(1));
+actions.add(Action.times(14));
+actions.add(Action.times(6));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark, extend more
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+actions.add(Action.times(43));
+configurations.add(actions);
+
+// Simple: late new window, extended past watermark, extend more
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(33));
+actions.add(Action.times(11));
+configurations.add(actions);
+
+// Simple: new window closes, then extended
+actions = new LinkedList<>();
+actions.add(Action.times(11));
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(18));
+configurations.add(actions);
+
+// Merging: new window closes, then extended then merged with new window
+actions = new LinkedList<>();
+actions.add(Action.times(11));
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(18));
+actions.add(Action.times(41));
+actions.add(Action.times(27, 33));
+configurations.add(actions);
+
+// Merging: late window, merges with new window
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29));
+actions.add(Action.times(41));
+configurations.add(actions);
+
+// Merging: late window, new window joined
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29));
+actions.add(Action.times(45));
+actions.add(Action.times(36));
+configurations.add(actions);
+
+// Merging: late window, new window all at once
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(29, 45, 36));
+configurations.add(actions);
+
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(42));
+actions.add(Action.times(33));
+configurations.add(actions);
+
+actions = new LinkedList<>();
+actions.add(Action.inputWatermark(40));
+actions.add(Action.times(25));
+actions.add(Action.times(42));
+actions.add(Action.times(33, 21));
+actions.add(Action.inputWatermark(50));
+actions.add(Action.times(12));
+configurations.add(actions);
+
+for (LinkedList configuration : configurations) {
+  System.out.println("Running config " + configuration.toString());
 
 Review comment:
   Remove all the print statements.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking