This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b6bc9040954 Merge pull request #37715: Disable combiner lifting only
for count triggers
b6bc9040954 is described below
commit b6bc90409543987c9f5dff2bbb6fbfca207ada7a
Author: Reuven Lax <[email protected]>
AuthorDate: Fri Mar 6 13:33:27 2026 -0800
Merge pull request #37715: Disable combiner lifting only for count triggers
---
.../dataflow/DataflowPipelineTranslator.java | 90 ++++++++++-
.../dataflow/DataflowPipelineTranslatorTest.java | 138 +++++++++++++++++
.../beam/sdk/transforms/windowing/AfterAll.java | 5 +
.../beam/sdk/transforms/windowing/AfterEach.java | 7 +-
.../beam/sdk/transforms/windowing/AfterFirst.java | 5 +
.../beam/sdk/transforms/windowing/AfterPane.java | 5 +
.../transforms/windowing/AfterProcessingTime.java | 5 +
.../windowing/AfterSynchronizedProcessingTime.java | 5 +
.../sdk/transforms/windowing/AfterWatermark.java | 10 ++
.../sdk/transforms/windowing/DefaultTrigger.java | 5 +
.../beam/sdk/transforms/windowing/Never.java | 5 +
.../sdk/transforms/windowing/OrFinallyTrigger.java | 5 +
.../beam/sdk/transforms/windowing/Repeatedly.java | 5 +
.../sdk/transforms/windowing/ReshuffleTrigger.java | 5 +
.../beam/sdk/transforms/windowing/Trigger.java | 3 +
.../sdk/transforms/windowing/TriggerVisitor.java} | 61 ++++----
.../sdk/util/construction/TriggerTranslation.java | 165 ++++++++-------------
.../beam/sdk/transforms/windowing/StubTrigger.java | 5 +
.../beam/sdk/transforms/windowing/TriggerTest.java | 10 ++
19 files changed, 405 insertions(+), 134 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 08d84705c5c..c57b5e3b1a0 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -88,7 +88,19 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHint;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import
org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TriggerVisitor;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CoderUtils;
@@ -134,6 +146,78 @@ public class DataflowPipelineTranslator {
private static final Logger LOG =
LoggerFactory.getLogger(DataflowPipelineTranslator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
+ /** Checks to see whether the Trigger tree is compatible with combiner
lifting. */
+ private static class TriggerCombinerLiftingCompatibility implements
TriggerVisitor<Boolean> {
+ static final TriggerCombinerLiftingCompatibility INSTANCE =
+ new TriggerCombinerLiftingCompatibility();
+
+ @Override
+ public Boolean visit(DefaultTrigger trigger) {
+ return true;
+ }
+
+ @Override
+ public Boolean visit(AfterWatermark.FromEndOfWindow trigger) {
+ return true;
+ }
+
+ @Override
+ public Boolean visit(AfterWatermark.AfterWatermarkEarlyAndLate trigger) {
+ return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+ }
+
+ @Override
+ public Boolean visit(Never.NeverTrigger trigger) {
+ return true;
+ }
+
+ @Override
+ public Boolean visit(ReshuffleTrigger<?> trigger) {
+ return false;
+ }
+
+ @Override
+ public Boolean visit(AfterProcessingTime trigger) {
+ return true;
+ }
+
+ @Override
+ public Boolean visit(AfterSynchronizedProcessingTime trigger) {
+ return true;
+ }
+
+ @Override
+ public Boolean visit(AfterFirst trigger) {
+ return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+ }
+
+ @Override
+ public Boolean visit(AfterAll trigger) {
+ return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+ }
+
+ @Override
+ public Boolean visit(AfterEach trigger) {
+ return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+ }
+
+ @Override
+ public Boolean visit(AfterPane trigger) {
+ // Combiner lifting not supported for count triggers.
+ return false;
+ }
+
+ @Override
+ public Boolean visit(Repeatedly trigger) {
+ return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+ }
+
+ @Override
+ public Boolean visit(OrFinallyTrigger trigger) {
+ return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+ }
+ }
+
private static byte[] serializeWindowingStrategy(
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
try {
@@ -970,8 +1054,10 @@ public class DataflowPipelineTranslator {
&& windowingStrategy.getWindowFn().assignsToOneWindow();
if (isStreaming) {
allowCombinerLifting &= transform.fewKeys();
- // TODO: Allow combiner lifting on the non-default trigger, as
appropriate.
- allowCombinerLifting &= (windowingStrategy.getTrigger()
instanceof DefaultTrigger);
+ allowCombinerLifting &=
+ windowingStrategy
+ .getTrigger()
+ .accept(TriggerCombinerLiftingCompatibility.INSTANCE);
}
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING,
!allowCombinerLifting);
stepContext.addInput(
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 8226dc2c727..52b460302c1 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow;
+import static org.apache.beam.runners.dataflow.util.Structs.getBoolean;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
@@ -89,11 +90,13 @@ import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -110,7 +113,17 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import
org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.DoFnInfo;
@@ -235,6 +248,131 @@ public class DataflowPipelineTranslatorTest implements
Serializable {
return options;
}
+ private void testTriggerCombinerLiftingDisabled(Trigger trigger) throws
Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ options.as(StreamingOptions.class).setStreaming(true);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ p.apply("create", Create.of(1, 2, 3, 4).withCoder(VarIntCoder.of()))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("window",
Window.<Integer>configure().triggering(trigger).discardingFiredPanes())
+ .apply("count",
Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineOptions translatorOptions =
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ translatorOptions.setStreaming(true);
+ DataflowPipelineTranslator t =
DataflowPipelineTranslator.fromOptions(translatorOptions);
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+
+ boolean foundDisable = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getBoolean(step.getProperties(),
PropertyNames.DISALLOW_COMBINER_LIFTING, false)) {
+ foundDisable = true;
+ }
+ }
+ assertTrue(foundDisable);
+ }
+
+ @Test
+ public void testRepeatedCountTriggerDisablesCombinerLifting() throws
IOException, Exception {
+
testTriggerCombinerLiftingDisabled(Repeatedly.forever((AfterPane.elementCountAtLeast(1))));
+ }
+
+ @Test
+ public void testEarlyCountTriggerDisablesCombinerLifting() throws
IOException, Exception {
+ testTriggerCombinerLiftingDisabled(
+
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)));
+ }
+
+ @Test
+ public void testAfterFirstCountTriggerDisablesCombinerLifting() throws
IOException, Exception {
+ testTriggerCombinerLiftingDisabled(
+ Repeatedly.forever(AfterFirst.of(Never.ever(),
AfterPane.elementCountAtLeast(1))));
+ }
+
+ @Test
+ public void testAfterAllCountTriggerDisablesCombinerLifting() throws
IOException, Exception {
+ testTriggerCombinerLiftingDisabled(
+ Repeatedly.forever(AfterAll.of(Never.ever(),
AfterPane.elementCountAtLeast(1))));
+ }
+
+ @Test
+ public void testCombinerLiftingEnabled() throws IOException, Exception {
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setRunner(DataflowRunner.class);
+ options.as(StreamingOptions.class).setStreaming(true);
+ Pipeline p = Pipeline.create(options);
+
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ SdkComponents sdkComponents = createSdkComponents(options);
+
+ PCollection<Integer> input =
+ p.apply("create", Create.of(1, 2, 3, 4).withCoder(VarIntCoder.of()));
+
+ input
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply(
+ "window1",
+ Window.<Integer>into(FixedWindows.of(Duration.millis(1)))
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes())
+ .apply("count",
Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
+
+ input
+ .apply(
+ "window2",
+ Window.<Integer>configure()
+ .triggering(AfterWatermark.pastEndOfWindow())
+ .discardingFiredPanes())
+ .apply("count2",
Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
+
+ input
+ .apply(
+ "window3",
+ Window.<Integer>configure()
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(
+
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))
+
.withLateFirings(AfterSynchronizedProcessingTime.ofFirstElement()))
+ .discardingFiredPanes())
+ .apply("count3",
Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p,
sdkComponents, true);
+ DataflowPipelineOptions translatorOptions =
+ PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ translatorOptions.setStreaming(true);
+ DataflowPipelineTranslator t =
DataflowPipelineTranslator.fromOptions(translatorOptions);
+
+ JobSpecification jobSpecification =
+ t.translate(
+ p,
+ pipelineProto,
+ sdkComponents,
+ DataflowRunner.fromOptions(options),
+ Collections.emptyList());
+
+ boolean foundDisable = false;
+ for (Step step : jobSpecification.getJob().getSteps()) {
+ if (getBoolean(step.getProperties(),
PropertyNames.DISALLOW_COMBINER_LIFTING, false)) {
+ foundDisable = true;
+ }
+ }
+ assertFalse(foundDisable);
+ }
+
// Test that the transform names for Storage Write API for streaming
pipelines are what we expect
// them to be. This is required since the Windmill backend expects the step
to contain that name.
// For a more stable solution, we should use URN, but that is not currently
used in the legacy
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
index b5838b77521..930848605f0 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
@@ -58,6 +58,11 @@ public class AfterAll extends OnceTrigger {
return deadline;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
protected OnceTrigger getContinuationTrigger(List<Trigger>
continuationTriggers) {
return new AfterAll(continuationTriggers);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
index 654c7a46647..48118fafa2d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -67,7 +67,7 @@ public class AfterEach extends Trigger {
@Override
public boolean mayFinish() {
- return subTriggers.stream().allMatch(trigger -> trigger.mayFinish());
+ return subTriggers.stream().allMatch(Trigger::mayFinish);
}
@Override
@@ -75,6 +75,11 @@ public class AfterEach extends Trigger {
return Repeatedly.forever(new AfterFirst(continuationTriggers));
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
index b8118848c96..17743a1006d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
@@ -58,6 +58,11 @@ public class AfterFirst extends OnceTrigger {
return deadline;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
protected OnceTrigger getContinuationTrigger(List<Trigger>
continuationTriggers) {
return new AfterFirst(continuationTriggers);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 3544a38b69a..b01bf91135e 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -59,6 +59,11 @@ public class AfterPane extends OnceTrigger {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
protected OnceTrigger getContinuationTrigger(List<Trigger>
continuationTriggers) {
return AfterPane.elementCountAtLeast(1);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
index c9612c06920..d974183c112 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
@@ -107,6 +107,11 @@ public class AfterProcessingTime extends OnceTrigger {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers)
{
return AfterSynchronizedProcessingTime.ofFirstElement();
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
index b3f25c6b14c..641bc1198f2 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -49,6 +49,11 @@ public class AfterSynchronizedProcessingTime extends
OnceTrigger {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers)
{
return this;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index 2a82fa5ab68..771dbf80c71 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -101,6 +101,11 @@ public class AfterWatermark {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public Trigger getContinuationTrigger() {
return new AfterWatermarkEarlyAndLate(
@@ -177,6 +182,11 @@ public class AfterWatermark {
return this;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public String toString() {
return TO_STRING;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
index 7060936c2fb..ba572f1f692 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
@@ -54,6 +54,11 @@ public class DefaultTrigger extends Trigger {
return other instanceof DefaultTrigger;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers)
{
return this;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 5f810430cdf..bf12f99e83f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -56,5 +56,10 @@ public final class Never {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
+
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
index 5a16c818da7..34d2409ee2f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -64,6 +64,11 @@ public class OrFinallyTrigger extends Trigger {
return subTriggers.get(ACTUAL).mayFinish() ||
subTriggers.get(UNTIL).mayFinish();
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers)
{
// Use OrFinallyTrigger instead of AfterFirst because the continuation of
ACTUAL
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
index 9c54d75da3e..2e26c6c6250 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
@@ -73,6 +73,11 @@ public class Repeatedly extends Trigger {
return false;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers)
{
return new Repeatedly(continuationTriggers.get(REPEATED));
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
index bf84466ccdb..e2ef114a486 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
@@ -53,6 +53,11 @@ public class ReshuffleTrigger<W extends BoundedWindow>
extends Trigger {
return false;
}
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return visitor.visit(this);
+ }
+
@Override
public String toString() {
return "ReshuffleTrigger()";
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index 16daa453021..2ddded2af8f 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -109,6 +109,9 @@ public abstract class Trigger implements Serializable {
return getContinuationTrigger(subTriggerContinuations);
}
+ @Internal
+ public abstract <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor);
+
/**
* Subclasses should override this to return the {@link
#getContinuationTrigger} of this {@link
* Trigger}. For convenience, this is provided the continuation trigger of
each of the
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerVisitor.java
similarity index 51%
copy from
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
copy to
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerVisitor.java
index bdcb82a3863..752b9260f7a 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerVisitor.java
@@ -17,35 +17,34 @@
*/
package org.apache.beam.sdk.transforms.windowing;
-import java.util.List;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
-import org.joda.time.Instant;
-
-/** No-op {@link OnceTrigger} implementation for testing. */
-abstract class StubTrigger extends Trigger.OnceTrigger {
- /**
- * Create a stub {@link Trigger} instance which returns the specified name
on {@link #toString()}.
- */
- static StubTrigger named(final String name) {
- return new StubTrigger() {
- @Override
- public String toString() {
- return name;
- }
- };
- }
-
- protected StubTrigger() {
- super(Lists.newArrayList());
- }
-
- @Override
- protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers)
{
- return null;
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return null;
- }
+import org.apache.beam.sdk.annotations.Internal;
+
+/** A TriggerVisitor. */
+@Internal
+public interface TriggerVisitor<OutputT> {
+ OutputT visit(DefaultTrigger trigger);
+
+ OutputT visit(AfterWatermark.FromEndOfWindow trigger);
+
+ OutputT visit(AfterWatermark.AfterWatermarkEarlyAndLate trigger);
+
+ OutputT visit(Never.NeverTrigger trigger);
+
+ OutputT visit(ReshuffleTrigger<?> trigger);
+
+ OutputT visit(AfterProcessingTime trigger);
+
+ OutputT visit(AfterSynchronizedProcessingTime trigger);
+
+ OutputT visit(AfterFirst trigger);
+
+ OutputT visit(AfterAll trigger);
+
+ OutputT visit(AfterEach trigger);
+
+ OutputT visit(AfterPane trigger);
+
+ OutputT visit(Repeatedly trigger);
+
+ OutputT visit(OrFinallyTrigger trigger);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TriggerTranslation.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TriggerTranslation.java
index d4ead4ce894..b4b104ace08 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TriggerTranslation.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TriggerTranslation.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.util.construction;
import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.List;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterAll;
@@ -30,16 +28,15 @@ import
org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import
org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import
org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.transforms.windowing.TriggerVisitor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
@@ -52,167 +49,135 @@ import org.joda.time.Instant;
})
public class TriggerTranslation implements Serializable {
- @VisibleForTesting static final ProtoConverter CONVERTER = new
ProtoConverter();
+ @VisibleForTesting static final ConversionVisitor CONVERTER = new
ConversionVisitor();
public static RunnerApi.Trigger toProto(Trigger trigger) {
- return CONVERTER.convertTrigger(trigger);
+ return trigger.accept(CONVERTER);
}
- @VisibleForTesting
- static class ProtoConverter {
-
- public RunnerApi.Trigger convertTrigger(Trigger trigger) {
- Method evaluationMethod = getEvaluationMethod(trigger.getClass());
- return tryConvert(evaluationMethod, trigger);
- }
-
- private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger
trigger) {
- try {
- return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
- } catch (InvocationTargetException exc) {
- if (exc.getCause() instanceof RuntimeException) {
- throw (RuntimeException) exc.getCause();
- } else {
- throw new RuntimeException(exc.getCause());
- }
- } catch (IllegalAccessException exc) {
- throw new IllegalStateException(
- String.format("Internal error: could not invoke %s",
evaluationMethod));
- }
- }
-
- private Method getEvaluationMethod(Class<?> clazz) {
- try {
- return getClass().getDeclaredMethod("convertSpecific", clazz);
- } catch (NoSuchMethodException exc) {
- throw new IllegalArgumentException(
- String.format(
- "Cannot translate trigger class %s to a runner-API proto.",
- clazz.getCanonicalName()),
- exc);
- }
- }
-
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
+ private static class ConversionVisitor implements
TriggerVisitor<RunnerApi.Trigger> {
+ @Override
+ public RunnerApi.Trigger visit(DefaultTrigger trigger) {
return RunnerApi.Trigger.newBuilder()
.setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
.build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) {
+ @Override
+ public RunnerApi.Trigger visit(AfterWatermark.FromEndOfWindow trigger) {
return RunnerApi.Trigger.newBuilder()
.setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.getDefaultInstance())
.build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
+ @Override
+ public RunnerApi.Trigger visit(AfterWatermark.AfterWatermarkEarlyAndLate
trigger) {
+ RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
+ RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
+
+ builder.setEarlyFirings(toProto(trigger.getEarlyTrigger()));
+ if (trigger.getLateTrigger() != null) {
+ builder.setLateFirings(toProto(trigger.getLateTrigger()));
+ }
+
+ return
RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
+ }
+
+ @Override
+ public RunnerApi.Trigger visit(Never.NeverTrigger trigger) {
return RunnerApi.Trigger.newBuilder()
.setNever(RunnerApi.Trigger.Never.getDefaultInstance())
.build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
+ @Override
+ public RunnerApi.Trigger visit(ReshuffleTrigger<?> trigger) {
return RunnerApi.Trigger.newBuilder()
.setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
.build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime
v) {
+ @Override
+ public RunnerApi.Trigger visit(AfterProcessingTime trigger) {
+ RunnerApi.Trigger.AfterProcessingTime.Builder builder =
+ RunnerApi.Trigger.AfterProcessingTime.newBuilder();
+
+ for (TimestampTransform transform : trigger.getTimestampTransforms()) {
+ builder.addTimestampTransforms(convertTimestampTransform(transform));
+ }
+
+ return
RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
+ }
+
+ @Override
+ public RunnerApi.Trigger visit(AfterSynchronizedProcessingTime trigger) {
return RunnerApi.Trigger.newBuilder()
.setAfterSynchronizedProcessingTime(
RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
.build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(AfterFirst v) {
+ @Override
+ public RunnerApi.Trigger visit(AfterFirst trigger) {
RunnerApi.Trigger.AfterAny.Builder builder =
RunnerApi.Trigger.AfterAny.newBuilder();
- for (Trigger subtrigger : v.subTriggers()) {
+ for (Trigger subtrigger : trigger.subTriggers()) {
builder.addSubtriggers(toProto(subtrigger));
}
return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(AfterAll v) {
+ @Override
+ public RunnerApi.Trigger visit(AfterAll trigger) {
RunnerApi.Trigger.AfterAll.Builder builder =
RunnerApi.Trigger.AfterAll.newBuilder();
- for (Trigger subtrigger : v.subTriggers()) {
+ for (Trigger subtrigger : trigger.subTriggers()) {
builder.addSubtriggers(toProto(subtrigger));
}
return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(AfterPane v) {
- return RunnerApi.Trigger.newBuilder()
- .setElementCount(
-
RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
- .build();
- }
-
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) {
- RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
- RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
-
- builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
- if (v.getLateTrigger() != null) {
- builder.setLateFirings(toProto(v.getLateTrigger()));
- }
-
- return
RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
- }
-
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(AfterEach v) {
+ @Override
+ public RunnerApi.Trigger visit(AfterEach trigger) {
RunnerApi.Trigger.AfterEach.Builder builder =
RunnerApi.Trigger.AfterEach.newBuilder();
- for (Trigger subtrigger : v.subTriggers()) {
+ for (Trigger subtrigger : trigger.subTriggers()) {
builder.addSubtriggers(toProto(subtrigger));
}
return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(Repeatedly v) {
+ @Override
+ public RunnerApi.Trigger visit(AfterPane trigger) {
+ return RunnerApi.Trigger.newBuilder()
+ .setElementCount(
+ RunnerApi.Trigger.ElementCount.newBuilder()
+ .setElementCount(trigger.getElementCount()))
+ .build();
+ }
+
+ @Override
+ public RunnerApi.Trigger visit(Repeatedly trigger) {
return RunnerApi.Trigger.newBuilder()
.setRepeat(
-
RunnerApi.Trigger.Repeat.newBuilder().setSubtrigger(toProto(v.getRepeatedTrigger())))
+ RunnerApi.Trigger.Repeat.newBuilder()
+ .setSubtrigger(toProto(trigger.getRepeatedTrigger())))
.build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
+ @Override
+ public RunnerApi.Trigger visit(OrFinallyTrigger trigger) {
return RunnerApi.Trigger.newBuilder()
.setOrFinally(
RunnerApi.Trigger.OrFinally.newBuilder()
- .setMain(toProto(v.getMainTrigger()))
- .setFinally(toProto(v.getUntilTrigger())))
+ .setMain(toProto(trigger.getMainTrigger()))
+ .setFinally(toProto(trigger.getUntilTrigger())))
.build();
}
- @SuppressWarnings("unused")
- private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
- RunnerApi.Trigger.AfterProcessingTime.Builder builder =
- RunnerApi.Trigger.AfterProcessingTime.newBuilder();
-
- for (TimestampTransform transform : v.getTimestampTransforms()) {
- builder.addTimestampTransforms(convertTimestampTransform(transform));
- }
-
- return
RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
- }
-
private RunnerApi.TimestampTransform
convertTimestampTransform(TimestampTransform transform) {
if (transform instanceof TimestampTransform.Delay) {
return RunnerApi.TimestampTransform.newBuilder()
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
index bdcb82a3863..952064a182c 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
@@ -48,4 +48,9 @@ abstract class StubTrigger extends Trigger.OnceTrigger {
public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
return null;
}
+
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return null;
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
index 335d9670625..bc062281e5d 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
@@ -73,6 +73,11 @@ public class TriggerTest {
public boolean mayFinish() {
return false;
}
+
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return null;
+ }
}
private static class Trigger2 extends Trigger {
@@ -95,5 +100,10 @@ public class TriggerTest {
public boolean mayFinish() {
return false;
}
+
+ @Override
+ public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+ return null;
+ }
}
}