This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b6bc9040954 Merge pull request #37715: Disable combiner lifting only 
for count triggers
b6bc9040954 is described below

commit b6bc90409543987c9f5dff2bbb6fbfca207ada7a
Author: Reuven Lax <[email protected]>
AuthorDate: Fri Mar 6 13:33:27 2026 -0800

    Merge pull request #37715: Disable combiner lifting only for count triggers
---
 .../dataflow/DataflowPipelineTranslator.java       |  90 ++++++++++-
 .../dataflow/DataflowPipelineTranslatorTest.java   | 138 +++++++++++++++++
 .../beam/sdk/transforms/windowing/AfterAll.java    |   5 +
 .../beam/sdk/transforms/windowing/AfterEach.java   |   7 +-
 .../beam/sdk/transforms/windowing/AfterFirst.java  |   5 +
 .../beam/sdk/transforms/windowing/AfterPane.java   |   5 +
 .../transforms/windowing/AfterProcessingTime.java  |   5 +
 .../windowing/AfterSynchronizedProcessingTime.java |   5 +
 .../sdk/transforms/windowing/AfterWatermark.java   |  10 ++
 .../sdk/transforms/windowing/DefaultTrigger.java   |   5 +
 .../beam/sdk/transforms/windowing/Never.java       |   5 +
 .../sdk/transforms/windowing/OrFinallyTrigger.java |   5 +
 .../beam/sdk/transforms/windowing/Repeatedly.java  |   5 +
 .../sdk/transforms/windowing/ReshuffleTrigger.java |   5 +
 .../beam/sdk/transforms/windowing/Trigger.java     |   3 +
 .../sdk/transforms/windowing/TriggerVisitor.java}  |  61 ++++----
 .../sdk/util/construction/TriggerTranslation.java  | 165 ++++++++-------------
 .../beam/sdk/transforms/windowing/StubTrigger.java |   5 +
 .../beam/sdk/transforms/windowing/TriggerTest.java |  10 ++
 19 files changed, 405 insertions(+), 134 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 08d84705c5c..c57b5e3b1a0 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -88,7 +88,19 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.resourcehints.ResourceHint;
 import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import 
org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TriggerVisitor;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -134,6 +146,78 @@ public class DataflowPipelineTranslator {
   private static final Logger LOG = 
LoggerFactory.getLogger(DataflowPipelineTranslator.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
+  /** Checks to see whether the Trigger tree is compatible with combiner 
lifting. */
+  private static class TriggerCombinerLiftingCompatibility implements 
TriggerVisitor<Boolean> {
+    static final TriggerCombinerLiftingCompatibility INSTANCE =
+        new TriggerCombinerLiftingCompatibility();
+
+    @Override
+    public Boolean visit(DefaultTrigger trigger) {
+      return true;
+    }
+
+    @Override
+    public Boolean visit(AfterWatermark.FromEndOfWindow trigger) {
+      return true;
+    }
+
+    @Override
+    public Boolean visit(AfterWatermark.AfterWatermarkEarlyAndLate trigger) {
+      return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+    }
+
+    @Override
+    public Boolean visit(Never.NeverTrigger trigger) {
+      return true;
+    }
+
+    @Override
+    public Boolean visit(ReshuffleTrigger<?> trigger) {
+      return false;
+    }
+
+    @Override
+    public Boolean visit(AfterProcessingTime trigger) {
+      return true;
+    }
+
+    @Override
+    public Boolean visit(AfterSynchronizedProcessingTime trigger) {
+      return true;
+    }
+
+    @Override
+    public Boolean visit(AfterFirst trigger) {
+      return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+    }
+
+    @Override
+    public Boolean visit(AfterAll trigger) {
+      return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+    }
+
+    @Override
+    public Boolean visit(AfterEach trigger) {
+      return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+    }
+
+    @Override
+    public Boolean visit(AfterPane trigger) {
+      // Combiner lifting not supported for count triggers.
+      return false;
+    }
+
+    @Override
+    public Boolean visit(Repeatedly trigger) {
+      return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+    }
+
+    @Override
+    public Boolean visit(OrFinallyTrigger trigger) {
+      return trigger.subTriggers().stream().allMatch(t -> t.accept(this));
+    }
+  }
+
   private static byte[] serializeWindowingStrategy(
       WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
     try {
@@ -970,8 +1054,10 @@ public class DataflowPipelineTranslator {
                     && windowingStrategy.getWindowFn().assignsToOneWindow();
             if (isStreaming) {
               allowCombinerLifting &= transform.fewKeys();
-              // TODO: Allow combiner lifting on the non-default trigger, as 
appropriate.
-              allowCombinerLifting &= (windowingStrategy.getTrigger() 
instanceof DefaultTrigger);
+              allowCombinerLifting &=
+                  windowingStrategy
+                      .getTrigger()
+                      .accept(TriggerCombinerLiftingCompatibility.INSTANCE);
             }
             stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, 
!allowCombinerLifting);
             stepContext.addInput(
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 8226dc2c727..52b460302c1 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow;
 
+import static org.apache.beam.runners.dataflow.util.Structs.getBoolean;
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
@@ -89,11 +90,13 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -110,7 +113,17 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
 import org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import 
org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.DoFnInfo;
@@ -235,6 +248,131 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     return options;
   }
 
+  private void testTriggerCombinerLiftingDisabled(Trigger trigger) throws 
Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    options.as(StreamingOptions.class).setStreaming(true);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    p.apply("create", Create.of(1, 2, 3, 4).withCoder(VarIntCoder.of()))
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply("window", 
Window.<Integer>configure().triggering(trigger).discardingFiredPanes())
+        .apply("count", 
Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineOptions translatorOptions =
+        PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    translatorOptions.setStreaming(true);
+    DataflowPipelineTranslator t = 
DataflowPipelineTranslator.fromOptions(translatorOptions);
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+
+    boolean foundDisable = false;
+    for (Step step : jobSpecification.getJob().getSteps()) {
+      if (getBoolean(step.getProperties(), 
PropertyNames.DISALLOW_COMBINER_LIFTING, false)) {
+        foundDisable = true;
+      }
+    }
+    assertTrue(foundDisable);
+  }
+
+  @Test
+  public void testRepeatedCountTriggerDisablesCombinerLifting() throws 
IOException, Exception {
+    
testTriggerCombinerLiftingDisabled(Repeatedly.forever((AfterPane.elementCountAtLeast(1))));
+  }
+
+  @Test
+  public void testEarlyCountTriggerDisablesCombinerLifting() throws 
IOException, Exception {
+    testTriggerCombinerLiftingDisabled(
+        
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)));
+  }
+
+  @Test
+  public void testAfterFirstCountTriggerDisablesCombinerLifting() throws 
IOException, Exception {
+    testTriggerCombinerLiftingDisabled(
+        Repeatedly.forever(AfterFirst.of(Never.ever(), 
AfterPane.elementCountAtLeast(1))));
+  }
+
+  @Test
+  public void testAfterAllCountTriggerDisablesCombinerLifting() throws 
IOException, Exception {
+    testTriggerCombinerLiftingDisabled(
+        Repeatedly.forever(AfterAll.of(Never.ever(), 
AfterPane.elementCountAtLeast(1))));
+  }
+
+  @Test
+  public void testCombinerLiftingEnabled() throws IOException, Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowRunner.class);
+    options.as(StreamingOptions.class).setStreaming(true);
+    Pipeline p = Pipeline.create(options);
+
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    SdkComponents sdkComponents = createSdkComponents(options);
+
+    PCollection<Integer> input =
+        p.apply("create", Create.of(1, 2, 3, 4).withCoder(VarIntCoder.of()));
+
+    input
+        .setIsBoundedInternal(IsBounded.UNBOUNDED)
+        .apply(
+            "window1",
+            Window.<Integer>into(FixedWindows.of(Duration.millis(1)))
+                .triggering(DefaultTrigger.of())
+                .discardingFiredPanes())
+        .apply("count", 
Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
+
+    input
+        .apply(
+            "window2",
+            Window.<Integer>configure()
+                .triggering(AfterWatermark.pastEndOfWindow())
+                .discardingFiredPanes())
+        .apply("count2", 
Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
+
+    input
+        .apply(
+            "window3",
+            Window.<Integer>configure()
+                .triggering(
+                    AfterWatermark.pastEndOfWindow()
+                        .withEarlyFirings(
+                            
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))
+                        
.withLateFirings(AfterSynchronizedProcessingTime.ofFirstElement()))
+                .discardingFiredPanes())
+        .apply("count3", 
Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, 
sdkComponents, true);
+    DataflowPipelineOptions translatorOptions =
+        PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    translatorOptions.setStreaming(true);
+    DataflowPipelineTranslator t = 
DataflowPipelineTranslator.fromOptions(translatorOptions);
+
+    JobSpecification jobSpecification =
+        t.translate(
+            p,
+            pipelineProto,
+            sdkComponents,
+            DataflowRunner.fromOptions(options),
+            Collections.emptyList());
+
+    boolean foundDisable = false;
+    for (Step step : jobSpecification.getJob().getSteps()) {
+      if (getBoolean(step.getProperties(), 
PropertyNames.DISALLOW_COMBINER_LIFTING, false)) {
+        foundDisable = true;
+      }
+    }
+    assertFalse(foundDisable);
+  }
+
   // Test that the transform names for Storage Write API for streaming 
pipelines are what we expect
   // them to be. This is required since the Windmill backend expects the step 
to contain that name.
   // For a more stable solution, we should use URN, but that is not currently 
used in the legacy
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
index b5838b77521..930848605f0 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
@@ -58,6 +58,11 @@ public class AfterAll extends OnceTrigger {
     return deadline;
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   protected OnceTrigger getContinuationTrigger(List<Trigger> 
continuationTriggers) {
     return new AfterAll(continuationTriggers);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
index 654c7a46647..48118fafa2d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -67,7 +67,7 @@ public class AfterEach extends Trigger {
 
   @Override
   public boolean mayFinish() {
-    return subTriggers.stream().allMatch(trigger -> trigger.mayFinish());
+    return subTriggers.stream().allMatch(Trigger::mayFinish);
   }
 
   @Override
@@ -75,6 +75,11 @@ public class AfterEach extends Trigger {
     return Repeatedly.forever(new AfterFirst(continuationTriggers));
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder("AfterEach.inOrder(");
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
index b8118848c96..17743a1006d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
@@ -58,6 +58,11 @@ public class AfterFirst extends OnceTrigger {
     return deadline;
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   protected OnceTrigger getContinuationTrigger(List<Trigger> 
continuationTriggers) {
     return new AfterFirst(continuationTriggers);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 3544a38b69a..b01bf91135e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -59,6 +59,11 @@ public class AfterPane extends OnceTrigger {
     return BoundedWindow.TIMESTAMP_MAX_VALUE;
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   protected OnceTrigger getContinuationTrigger(List<Trigger> 
continuationTriggers) {
     return AfterPane.elementCountAtLeast(1);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
index c9612c06920..d974183c112 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
@@ -107,6 +107,11 @@ public class AfterProcessingTime extends OnceTrigger {
     return BoundedWindow.TIMESTAMP_MAX_VALUE;
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
     return AfterSynchronizedProcessingTime.ofFirstElement();
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
index b3f25c6b14c..641bc1198f2 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -49,6 +49,11 @@ public class AfterSynchronizedProcessingTime extends 
OnceTrigger {
     return BoundedWindow.TIMESTAMP_MAX_VALUE;
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
     return this;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index 2a82fa5ab68..771dbf80c71 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -101,6 +101,11 @@ public class AfterWatermark {
       return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
     }
 
+    @Override
+    public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+      return visitor.visit(this);
+    }
+
     @Override
     public Trigger getContinuationTrigger() {
       return new AfterWatermarkEarlyAndLate(
@@ -177,6 +182,11 @@ public class AfterWatermark {
       return this;
     }
 
+    @Override
+    public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+      return visitor.visit(this);
+    }
+
     @Override
     public String toString() {
       return TO_STRING;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
index 7060936c2fb..ba572f1f692 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
@@ -54,6 +54,11 @@ public class DefaultTrigger extends Trigger {
     return other instanceof DefaultTrigger;
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
     return this;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 5f810430cdf..bf12f99e83f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -56,5 +56,10 @@ public final class Never {
     public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
       return BoundedWindow.TIMESTAMP_MAX_VALUE;
     }
+
+    @Override
+    public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+      return visitor.visit(this);
+    }
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
index 5a16c818da7..34d2409ee2f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -64,6 +64,11 @@ public class OrFinallyTrigger extends Trigger {
     return subTriggers.get(ACTUAL).mayFinish() || 
subTriggers.get(UNTIL).mayFinish();
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
     // Use OrFinallyTrigger instead of AfterFirst because the continuation of 
ACTUAL
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
index 9c54d75da3e..2e26c6c6250 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
@@ -73,6 +73,11 @@ public class Repeatedly extends Trigger {
     return false;
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
     return new Repeatedly(continuationTriggers.get(REPEATED));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
index bf84466ccdb..e2ef114a486 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
@@ -53,6 +53,11 @@ public class ReshuffleTrigger<W extends BoundedWindow> 
extends Trigger {
     return false;
   }
 
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return visitor.visit(this);
+  }
+
   @Override
   public String toString() {
     return "ReshuffleTrigger()";
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index 16daa453021..2ddded2af8f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -109,6 +109,9 @@ public abstract class Trigger implements Serializable {
     return getContinuationTrigger(subTriggerContinuations);
   }
 
+  @Internal
+  public abstract <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor);
+
   /**
    * Subclasses should override this to return the {@link 
#getContinuationTrigger} of this {@link
    * Trigger}. For convenience, this is provided the continuation trigger of 
each of the
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerVisitor.java
similarity index 51%
copy from 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
copy to 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerVisitor.java
index bdcb82a3863..752b9260f7a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TriggerVisitor.java
@@ -17,35 +17,34 @@
  */
 package org.apache.beam.sdk.transforms.windowing;
 
-import java.util.List;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
-import org.joda.time.Instant;
-
-/** No-op {@link OnceTrigger} implementation for testing. */
-abstract class StubTrigger extends Trigger.OnceTrigger {
-  /**
-   * Create a stub {@link Trigger} instance which returns the specified name 
on {@link #toString()}.
-   */
-  static StubTrigger named(final String name) {
-    return new StubTrigger() {
-      @Override
-      public String toString() {
-        return name;
-      }
-    };
-  }
-
-  protected StubTrigger() {
-    super(Lists.newArrayList());
-  }
-
-  @Override
-  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
-    return null;
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    return null;
-  }
+import org.apache.beam.sdk.annotations.Internal;
+
+/** A TriggerVisitor. */
+@Internal
+public interface TriggerVisitor<OutputT> {
+  OutputT visit(DefaultTrigger trigger);
+
+  OutputT visit(AfterWatermark.FromEndOfWindow trigger);
+
+  OutputT visit(AfterWatermark.AfterWatermarkEarlyAndLate trigger);
+
+  OutputT visit(Never.NeverTrigger trigger);
+
+  OutputT visit(ReshuffleTrigger<?> trigger);
+
+  OutputT visit(AfterProcessingTime trigger);
+
+  OutputT visit(AfterSynchronizedProcessingTime trigger);
+
+  OutputT visit(AfterFirst trigger);
+
+  OutputT visit(AfterAll trigger);
+
+  OutputT visit(AfterEach trigger);
+
+  OutputT visit(AfterPane trigger);
+
+  OutputT visit(Repeatedly trigger);
+
+  OutputT visit(OrFinallyTrigger trigger);
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TriggerTranslation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TriggerTranslation.java
index d4ead4ce894..b4b104ace08 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TriggerTranslation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TriggerTranslation.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.util.construction;
 
 import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.List;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.sdk.transforms.windowing.AfterAll;
@@ -30,16 +28,15 @@ import 
org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import 
org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import 
org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
 import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
 import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.transforms.windowing.TriggerVisitor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.joda.time.Duration;
@@ -52,167 +49,135 @@ import org.joda.time.Instant;
 })
 public class TriggerTranslation implements Serializable {
 
-  @VisibleForTesting static final ProtoConverter CONVERTER = new 
ProtoConverter();
+  @VisibleForTesting static final ConversionVisitor CONVERTER = new 
ConversionVisitor();
 
   public static RunnerApi.Trigger toProto(Trigger trigger) {
-    return CONVERTER.convertTrigger(trigger);
+    return trigger.accept(CONVERTER);
   }
 
-  @VisibleForTesting
-  static class ProtoConverter {
-
-    public RunnerApi.Trigger convertTrigger(Trigger trigger) {
-      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
-      return tryConvert(evaluationMethod, trigger);
-    }
-
-    private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger 
trigger) {
-      try {
-        return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
-      } catch (InvocationTargetException exc) {
-        if (exc.getCause() instanceof RuntimeException) {
-          throw (RuntimeException) exc.getCause();
-        } else {
-          throw new RuntimeException(exc.getCause());
-        }
-      } catch (IllegalAccessException exc) {
-        throw new IllegalStateException(
-            String.format("Internal error: could not invoke %s", 
evaluationMethod));
-      }
-    }
-
-    private Method getEvaluationMethod(Class<?> clazz) {
-      try {
-        return getClass().getDeclaredMethod("convertSpecific", clazz);
-      } catch (NoSuchMethodException exc) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot translate trigger class %s to a runner-API proto.",
-                clazz.getCanonicalName()),
-            exc);
-      }
-    }
-
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
+  private static class ConversionVisitor implements 
TriggerVisitor<RunnerApi.Trigger> {
+    @Override
+    public RunnerApi.Trigger visit(DefaultTrigger trigger) {
       return RunnerApi.Trigger.newBuilder()
           .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
           .build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) {
+    @Override
+    public RunnerApi.Trigger visit(AfterWatermark.FromEndOfWindow trigger) {
       return RunnerApi.Trigger.newBuilder()
           
.setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.getDefaultInstance())
           .build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
+    @Override
+    public RunnerApi.Trigger visit(AfterWatermark.AfterWatermarkEarlyAndLate 
trigger) {
+      RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
+          RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
+
+      builder.setEarlyFirings(toProto(trigger.getEarlyTrigger()));
+      if (trigger.getLateTrigger() != null) {
+        builder.setLateFirings(toProto(trigger.getLateTrigger()));
+      }
+
+      return 
RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
+    }
+
+    @Override
+    public RunnerApi.Trigger visit(Never.NeverTrigger trigger) {
       return RunnerApi.Trigger.newBuilder()
           .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
           .build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
+    @Override
+    public RunnerApi.Trigger visit(ReshuffleTrigger<?> trigger) {
       return RunnerApi.Trigger.newBuilder()
           .setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
           .build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime 
v) {
+    @Override
+    public RunnerApi.Trigger visit(AfterProcessingTime trigger) {
+      RunnerApi.Trigger.AfterProcessingTime.Builder builder =
+          RunnerApi.Trigger.AfterProcessingTime.newBuilder();
+
+      for (TimestampTransform transform : trigger.getTimestampTransforms()) {
+        builder.addTimestampTransforms(convertTimestampTransform(transform));
+      }
+
+      return 
RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
+    }
+
+    @Override
+    public RunnerApi.Trigger visit(AfterSynchronizedProcessingTime trigger) {
       return RunnerApi.Trigger.newBuilder()
           .setAfterSynchronizedProcessingTime(
               
RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
           .build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(AfterFirst v) {
+    @Override
+    public RunnerApi.Trigger visit(AfterFirst trigger) {
       RunnerApi.Trigger.AfterAny.Builder builder = 
RunnerApi.Trigger.AfterAny.newBuilder();
 
-      for (Trigger subtrigger : v.subTriggers()) {
+      for (Trigger subtrigger : trigger.subTriggers()) {
         builder.addSubtriggers(toProto(subtrigger));
       }
 
       return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(AfterAll v) {
+    @Override
+    public RunnerApi.Trigger visit(AfterAll trigger) {
       RunnerApi.Trigger.AfterAll.Builder builder = 
RunnerApi.Trigger.AfterAll.newBuilder();
 
-      for (Trigger subtrigger : v.subTriggers()) {
+      for (Trigger subtrigger : trigger.subTriggers()) {
         builder.addSubtriggers(toProto(subtrigger));
       }
 
       return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(AfterPane v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setElementCount(
-              
RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
-          .build();
-    }
-
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) {
-      RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
-          RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
-
-      builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
-      if (v.getLateTrigger() != null) {
-        builder.setLateFirings(toProto(v.getLateTrigger()));
-      }
-
-      return 
RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
-    }
-
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(AfterEach v) {
+    @Override
+    public RunnerApi.Trigger visit(AfterEach trigger) {
       RunnerApi.Trigger.AfterEach.Builder builder = 
RunnerApi.Trigger.AfterEach.newBuilder();
 
-      for (Trigger subtrigger : v.subTriggers()) {
+      for (Trigger subtrigger : trigger.subTriggers()) {
         builder.addSubtriggers(toProto(subtrigger));
       }
 
       return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(Repeatedly v) {
+    @Override
+    public RunnerApi.Trigger visit(AfterPane trigger) {
+      return RunnerApi.Trigger.newBuilder()
+          .setElementCount(
+              RunnerApi.Trigger.ElementCount.newBuilder()
+                  .setElementCount(trigger.getElementCount()))
+          .build();
+    }
+
+    @Override
+    public RunnerApi.Trigger visit(Repeatedly trigger) {
       return RunnerApi.Trigger.newBuilder()
           .setRepeat(
-              
RunnerApi.Trigger.Repeat.newBuilder().setSubtrigger(toProto(v.getRepeatedTrigger())))
+              RunnerApi.Trigger.Repeat.newBuilder()
+                  .setSubtrigger(toProto(trigger.getRepeatedTrigger())))
           .build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
+    @Override
+    public RunnerApi.Trigger visit(OrFinallyTrigger trigger) {
       return RunnerApi.Trigger.newBuilder()
           .setOrFinally(
               RunnerApi.Trigger.OrFinally.newBuilder()
-                  .setMain(toProto(v.getMainTrigger()))
-                  .setFinally(toProto(v.getUntilTrigger())))
+                  .setMain(toProto(trigger.getMainTrigger()))
+                  .setFinally(toProto(trigger.getUntilTrigger())))
           .build();
     }
 
-    @SuppressWarnings("unused")
-    private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
-      RunnerApi.Trigger.AfterProcessingTime.Builder builder =
-          RunnerApi.Trigger.AfterProcessingTime.newBuilder();
-
-      for (TimestampTransform transform : v.getTimestampTransforms()) {
-        builder.addTimestampTransforms(convertTimestampTransform(transform));
-      }
-
-      return 
RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
-    }
-
     private RunnerApi.TimestampTransform 
convertTimestampTransform(TimestampTransform transform) {
       if (transform instanceof TimestampTransform.Delay) {
         return RunnerApi.TimestampTransform.newBuilder()
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
index bdcb82a3863..952064a182c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
@@ -48,4 +48,9 @@ abstract class StubTrigger extends Trigger.OnceTrigger {
   public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
     return null;
   }
+
+  @Override
+  public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+    return null;
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
index 335d9670625..bc062281e5d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
@@ -73,6 +73,11 @@ public class TriggerTest {
     public boolean mayFinish() {
       return false;
     }
+
+    @Override
+    public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+      return null;
+    }
   }
 
   private static class Trigger2 extends Trigger {
@@ -95,5 +100,10 @@ public class TriggerTest {
     public boolean mayFinish() {
       return false;
     }
+
+    @Override
+    public <OutputT> OutputT accept(TriggerVisitor<OutputT> visitor) {
+      return null;
+    }
   }
 }


Reply via email to