This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ff7964c  [BEAM-8006] Add retracting to windowing strategy translation.
ff7964c is described below

commit ff7964c7252c8a0c670f69bb4291230ca6136afd
Author: amaliujia <amaliu...@users.noreply.github.com>
AuthorDate: Tue Aug 20 15:29:29 2019 -0700

    [BEAM-8006] Add retracting to windowing strategy translation.
---
 .../core/construction/WindowingStrategyTranslation.java      |  4 ++++
 .../core/construction/WindowingStrategyTranslationTest.java  |  7 +++++++
 .../java/org/apache/beam/sdk/values/WindowingStrategy.java   | 12 ++++++++++--
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 5fd0f33..a57aa9b 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -57,6 +57,8 @@ public class WindowingStrategyTranslation implements 
Serializable {
         return AccumulationMode.DISCARDING_FIRED_PANES;
       case ACCUMULATING:
         return AccumulationMode.ACCUMULATING_FIRED_PANES;
+      case RETRACTING:
+        return AccumulationMode.RETRACTING_FIRED_PANES;
       case UNRECOGNIZED:
       default:
         // Whether or not it is proto that cannot recognize it (due to the 
version of the
@@ -77,6 +79,8 @@ public class WindowingStrategyTranslation implements 
Serializable {
         return RunnerApi.AccumulationMode.Enum.DISCARDING;
       case ACCUMULATING_FIRED_PANES:
         return RunnerApi.AccumulationMode.Enum.ACCUMULATING;
+      case RETRACTING_FIRED_PANES:
+        return RunnerApi.AccumulationMode.Enum.RETRACTING;
       default:
         throw new IllegalArgumentException(
             String.format(
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index eee9c3f..9f50c0b 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -85,6 +85,13 @@ public class WindowingStrategyTranslationTest {
                 .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
                 .withTrigger(REPRESENTATIVE_TRIGGER)
                 .withAllowedLateness(Duration.millis(93))
+                .withTimestampCombiner(TimestampCombiner.LATEST)),
+        toProtoAndBackSpec(
+            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+                .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
+                .withMode(AccumulationMode.RETRACTING_FIRED_PANES)
+                .withTrigger(REPRESENTATIVE_TRIGGER)
+                .withAllowedLateness(Duration.millis(100))
                 .withTimestampCombiner(TimestampCombiner.LATEST)));
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
index dfd9562..6b2c4d6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
@@ -44,10 +44,18 @@ import org.joda.time.Duration;
  */
 public class WindowingStrategy<T, W extends BoundedWindow> implements 
Serializable {
 
-  /** The accumulation modes that can be used with windowing. */
+  /**
+   * The accumulation modes that can be used with windowing.
+   *
+   * <p>Experimental {@link AccumulationMode.RETRACTING_FIRED_PANES} for 
enabling retractions in
+   * pipelines. There is no backwards-compatibility guarantees.
+   */
   public enum AccumulationMode {
     DISCARDING_FIRED_PANES,
-    ACCUMULATING_FIRED_PANES
+    ACCUMULATING_FIRED_PANES,
+    // RETRACTING_FIRED_PANES is experimental. There is no 
backwards-compatibility guarantees.
+    @Experimental
+    RETRACTING_FIRED_PANES,
   }
 
   private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO;

Reply via email to