This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ff7964c [BEAM-8006] Add retracting to windowing strategy translation. ff7964c is described below commit ff7964c7252c8a0c670f69bb4291230ca6136afd Author: amaliujia <amaliu...@users.noreply.github.com> AuthorDate: Tue Aug 20 15:29:29 2019 -0700 [BEAM-8006] Add retracting to windowing strategy translation. --- .../core/construction/WindowingStrategyTranslation.java | 4 ++++ .../core/construction/WindowingStrategyTranslationTest.java | 7 +++++++ .../java/org/apache/beam/sdk/values/WindowingStrategy.java | 12 ++++++++++-- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 5fd0f33..a57aa9b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -57,6 +57,8 @@ public class WindowingStrategyTranslation implements Serializable { return AccumulationMode.DISCARDING_FIRED_PANES; case ACCUMULATING: return AccumulationMode.ACCUMULATING_FIRED_PANES; + case RETRACTING: + return AccumulationMode.RETRACTING_FIRED_PANES; case UNRECOGNIZED: default: // Whether or not it is proto that cannot recognize it (due to the version of the @@ -77,6 +79,8 @@ public class WindowingStrategyTranslation implements Serializable { return RunnerApi.AccumulationMode.Enum.DISCARDING; case ACCUMULATING_FIRED_PANES: return RunnerApi.AccumulationMode.Enum.ACCUMULATING; + case RETRACTING_FIRED_PANES: + return RunnerApi.AccumulationMode.Enum.RETRACTING; default: throw new IllegalArgumentException( String.format( diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java index eee9c3f..9f50c0b 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java @@ -85,6 +85,13 @@ public class WindowingStrategyTranslationTest { .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withTrigger(REPRESENTATIVE_TRIGGER) .withAllowedLateness(Duration.millis(93)) + .withTimestampCombiner(TimestampCombiner.LATEST)), + toProtoAndBackSpec( + WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN) + .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY) + .withMode(AccumulationMode.RETRACTING_FIRED_PANES) + .withTrigger(REPRESENTATIVE_TRIGGER) + .withAllowedLateness(Duration.millis(100)) .withTimestampCombiner(TimestampCombiner.LATEST))); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java index dfd9562..6b2c4d6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java @@ -44,10 +44,18 @@ import org.joda.time.Duration; */ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializable { - /** The accumulation modes that can be used with windowing. */ + /** + * The accumulation modes that can be used with windowing. + * + * <p>Experimental {@link AccumulationMode.RETRACTING_FIRED_PANES} for enabling retractions in + * pipelines. There is no backwards-compatibility guarantees. + */ public enum AccumulationMode { DISCARDING_FIRED_PANES, - ACCUMULATING_FIRED_PANES + ACCUMULATING_FIRED_PANES, + // RETRACTING_FIRED_PANES is experimental. There is no backwards-compatibility guarantees. + @Experimental + RETRACTING_FIRED_PANES, } private static final Duration DEFAULT_ALLOWED_LATENESS = Duration.ZERO;