This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 146ea697bdf [FLINK-38891][table] Migrate `MiniBatchIntervalInferRule` 
to java
146ea697bdf is described below

commit 146ea697bdfde78f4ef154ed2ebe60204cd37e55
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Jan 13 22:27:29 2026 +0100

    [FLINK-38891][table] Migrate `MiniBatchIntervalInferRule` to java
---
 .../stream/MiniBatchIntervalInferRule.java         | 207 +++++++++++++++++++++
 .../stream/MiniBatchIntervalInferRule.scala        | 150 ---------------
 2 files changed, 207 insertions(+), 150 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.java
new file mode 100644
index 00000000000..b7717accc99
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGroupWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.trait.MiniBatchInterval;
+import org.apache.flink.table.planner.plan.trait.MiniBatchIntervalTrait;
+import org.apache.flink.table.planner.plan.trait.MiniBatchIntervalTraitDef;
+import org.apache.flink.table.planner.plan.trait.MiniBatchMode;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Planner rule that infers the mini-batch interval of minibatch assigner.
+ *
+ * <p>This rule could handle the following two kinds of operator: 1. supports 
operators which
+ * supports mini-batch and does not require watermark, e.g. group aggregate. 
In this case, {@link
+ * StreamPhysicalMiniBatchAssigner} with Proctime mode will be created if not 
exist, and the
+ * interval value will be set as {@link 
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY}.
+ * 2. supports operators which requires watermark, e.g. window join, window 
aggregate. In this case,
+ * {@link StreamPhysicalWatermarkAssigner} already exists, and its 
MiniBatchIntervalTrait will be
+ * updated as the merged intervals from its outputs. Currently, mini-batched 
window aggregate is not
+ * supported, and will be introduced later.
+ *
+ * <p>NOTES: This rule only supports HepPlanner with TOP_DOWN match order.
+ */
[email protected]
+public class MiniBatchIntervalInferRule
+        extends 
RelRule<MiniBatchIntervalInferRule.MiniBatchIntervalInferRuleConfig> {
+
+    public static final MiniBatchIntervalInferRule INSTANCE =
+            
MiniBatchIntervalInferRule.MiniBatchIntervalInferRuleConfig.DEFAULT.toRule();
+
+    protected MiniBatchIntervalInferRule(
+            MiniBatchIntervalInferRule.MiniBatchIntervalInferRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        StreamPhysicalRel rel = call.rel(0);
+        MiniBatchIntervalTrait miniBatchIntervalTrait =
+                
rel.getTraitSet().getTrait(MiniBatchIntervalTraitDef.INSTANCE());
+        List<RelNode> inputs = getInputs(rel);
+
+        MiniBatchIntervalTrait updatedTrait = getUpdatedTrait(rel, 
miniBatchIntervalTrait);
+
+        List<RelNode> updatedInputs = getUpdatedInput(inputs, updatedTrait);
+
+        // update parent if a child was updated
+        if (!inputs.equals(updatedInputs)) {
+            RelNode newRel = rel.copy(rel.getTraitSet(), updatedInputs);
+            call.transformTo(newRel);
+        }
+    }
+
+    private MiniBatchIntervalTrait getUpdatedTrait(
+            StreamPhysicalRel rel, MiniBatchIntervalTrait 
miniBatchIntervalTrait) {
+        if (rel instanceof StreamPhysicalGroupWindowAggregate) {
+            // TODO introduce mini-batch window aggregate later
+            return MiniBatchIntervalTrait.NO_MINIBATCH();
+        }
+
+        if (rel instanceof StreamPhysicalWatermarkAssigner
+                || rel instanceof StreamPhysicalMiniBatchAssigner) {
+            return MiniBatchIntervalTrait.NONE();
+        }
+
+        TableConfig tableConfig = ShortcutUtils.unwrapTableConfig(rel);
+        Boolean miniBatchEnabled =
+                
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+
+        if (rel.requireWatermark() && miniBatchEnabled) {
+            MiniBatchInterval mergedInterval =
+                    FlinkRelOptUtil.mergeMiniBatchInterval(
+                            miniBatchIntervalTrait.getMiniBatchInterval(),
+                            new MiniBatchInterval(0, MiniBatchMode.RowTime));
+            return new MiniBatchIntervalTrait(mergedInterval);
+        } else {
+            return miniBatchIntervalTrait;
+        }
+    }
+
+    // propagate parent's MiniBatchInterval to children.
+    private List<RelNode> getUpdatedInput(
+            List<RelNode> inputs, MiniBatchIntervalTrait updatedTrait) {
+        final List<RelNode> updatedInputs = new ArrayList<>();
+        for (RelNode input : inputs) {
+            // add mini-batch watermark assigner node.
+            if (shouldAppendMiniBatchAssignerNode(input)) {
+                updatedInputs.add(
+                        new StreamPhysicalMiniBatchAssigner(
+                                input.getCluster(),
+                                input.getTraitSet(),
+                                // attach NONE trait for all of the inputs of 
MiniBatchAssigner,
+                                // as they are leaf nodes and don't need to do 
propagate
+                                input.copy(
+                                        
input.getTraitSet().plus(MiniBatchIntervalTrait.NONE()),
+                                        input.getInputs())));
+            } else {
+                MiniBatchIntervalTrait originTrait =
+                        
input.getTraitSet().getTrait(MiniBatchIntervalTraitDef.INSTANCE());
+                if (originTrait != updatedTrait) {
+                    // calculate new MiniBatchIntervalTrait according parent's 
miniBatchInterval
+                    // and the child's original miniBatchInterval.
+                    MiniBatchInterval mergedMiniBatchInterval =
+                            FlinkRelOptUtil.mergeMiniBatchInterval(
+                                    originTrait.getMiniBatchInterval(),
+                                    updatedTrait.getMiniBatchInterval());
+                    MiniBatchIntervalTrait inferredTrait =
+                            new 
MiniBatchIntervalTrait(mergedMiniBatchInterval);
+                    updatedInputs.add(
+                            
input.copy(input.getTraitSet().plus(inferredTrait), input.getInputs()));
+                } else {
+                    updatedInputs.add(input);
+                }
+            }
+        }
+        return updatedInputs;
+    }
+
+    /**
+     * Get all children RelNodes of a RelNode.
+     *
+     * @param parent The parent RelNode
+     * @return All child nodes
+     */
+    private List<RelNode> getInputs(RelNode parent) {
+        return parent.getInputs().stream()
+                .map(i -> ((HepRelVertex) i).getCurrentRel())
+                .collect(Collectors.toList());
+    }
+
+    private boolean shouldAppendMiniBatchAssignerNode(RelNode node) {
+        final MiniBatchMode mode =
+                node.getTraitSet()
+                        .getTrait(MiniBatchIntervalTraitDef.INSTANCE())
+                        .getMiniBatchInterval()
+                        .getMode();
+        if (node instanceof StreamPhysicalDataStreamScan
+                || node instanceof StreamPhysicalLegacyTableSourceScan
+                || node instanceof StreamPhysicalTableSourceScan) {
+            // append minibatch node if the mode is not NONE and reach a 
source leaf node
+            return mode == MiniBatchMode.RowTime || mode == 
MiniBatchMode.ProcTime;
+        }
+        if (node instanceof StreamPhysicalWatermarkAssigner) {
+            // append minibatch node if it is rowtime mode and the child is 
watermark assigner
+            // TODO: if it is ProcTime mode, we also append a minibatch node 
for now.
+            //  Because the downstream can be a regular aggregate and the 
watermark assigner
+            //  might be redundant. In FLINK-14621, we will remove redundant 
watermark assigner,
+            //  then we can remove the ProcTime condition.
+            return mode == MiniBatchMode.RowTime || mode == 
MiniBatchMode.ProcTime;
+        }
+        // others do not append minibatch node
+        return false;
+    }
+
+    /** Configuration for {@link MiniBatchIntervalInferRule}. */
+    @Value.Immutable(singleton = false)
+    public interface MiniBatchIntervalInferRuleConfig extends RelRule.Config {
+        MiniBatchIntervalInferRule.MiniBatchIntervalInferRuleConfig DEFAULT =
+                
ImmutableMiniBatchIntervalInferRule.MiniBatchIntervalInferRuleConfig.builder()
+                        .build()
+                        .withOperandSupplier(b0 -> 
b0.operand(StreamPhysicalRel.class).anyInputs())
+                        .withDescription("MiniBatchIntervalInferRule")
+                        
.as(MiniBatchIntervalInferRule.MiniBatchIntervalInferRuleConfig.class);
+
+        @Override
+        default MiniBatchIntervalInferRule toRule() {
+            return new MiniBatchIntervalInferRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
deleted file mode 100644
index 8d0cb3cb0e0..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, 
MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode}
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan,
 StreamPhysicalGroupWindowAggregate, StreamPhysicalLegacyTableSourceScan, 
StreamPhysicalMiniBatchAssigner, StreamPhysicalRel, 
StreamPhysicalTableSourceScan, StreamPhysicalWatermarkAssigner}
-import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
-import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.plan.hep.HepRelVertex
-import org.apache.calcite.rel.RelNode
-
-import scala.collection.JavaConversions._
-
-/**
- * Planner rule that infers the mini-batch interval of minibatch asssigner.
- *
- * This rule could handle the following two kinds of operator:
- *   1. supports operators which supports mini-batch and does not require 
watermark, e.g. group
- *      aggregate. In this case, [[StreamPhysicalMiniBatchAssigner]] with 
Protime mode will be
- *      created if not exist, and the interval value will be set as
- *      [[ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY]]. 2. 
supports operators which
- *      requires watermark, e.g. window join, window aggregate. In this case,
- *      [[StreamPhysicalWatermarkAssigner]] already exists, and its 
MiniBatchIntervalTrait will be
- *      updated as the merged intervals from its outputs. Currently, 
mini-batched window aggregate
- *      is not supported, and will be introduced later.
- *
- * NOTES: This rule only supports HepPlanner with TOP_DOWN match order.
- */
-class MiniBatchIntervalInferRule
-  extends RelOptRule(operand(classOf[StreamPhysicalRel], any()), 
"MiniBatchIntervalInferRule") {
-
-  /**
-   * Get all children RelNodes of a RelNode.
-   *
-   * @param parent
-   *   The parent RelNode
-   * @return
-   *   All child nodes
-   */
-  def getInputs(parent: RelNode): Seq[RelNode] = {
-    parent.getInputs.map(_.asInstanceOf[HepRelVertex].getCurrentRel)
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val rel: StreamPhysicalRel = call.rel(0)
-    val miniBatchIntervalTrait = 
rel.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
-    val inputs = getInputs(rel)
-    val tableConfig = unwrapTableConfig(rel)
-    val miniBatchEnabled = 
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
-
-    val updatedTrait = rel match {
-      case _: StreamPhysicalGroupWindowAggregate =>
-        // TODO introduce mini-batch window aggregate later
-        MiniBatchIntervalTrait.NO_MINIBATCH
-
-      case _: StreamPhysicalWatermarkAssigner => MiniBatchIntervalTrait.NONE
-
-      case _: StreamPhysicalMiniBatchAssigner => MiniBatchIntervalTrait.NONE
-
-      case _ =>
-        if (rel.requireWatermark && miniBatchEnabled) {
-          val mergedInterval = FlinkRelOptUtil.mergeMiniBatchInterval(
-            miniBatchIntervalTrait.getMiniBatchInterval,
-            new MiniBatchInterval(0, MiniBatchMode.RowTime))
-          new MiniBatchIntervalTrait(mergedInterval)
-        } else {
-          miniBatchIntervalTrait
-        }
-    }
-
-    // propagate parent's MiniBatchInterval to children.
-    val updatedInputs = inputs.map {
-      input =>
-        // add mini-batch watermark assigner node.
-        if (shouldAppendMiniBatchAssignerNode(input)) {
-          new StreamPhysicalMiniBatchAssigner(
-            input.getCluster,
-            input.getTraitSet,
-            // attach NONE trait for all of the inputs of MiniBatchAssigner,
-            // as they are leaf nodes and don't need to do propagate
-            input.copy(input.getTraitSet.plus(MiniBatchIntervalTrait.NONE), 
input.getInputs)
-          )
-        } else {
-          val originTrait = 
input.getTraitSet.getTrait(MiniBatchIntervalTraitDef.INSTANCE)
-          if (originTrait != updatedTrait) {
-            // calculate new MiniBatchIntervalTrait according parent's 
miniBatchInterval
-            // and the child's original miniBatchInterval.
-            val mergedMiniBatchInterval = 
FlinkRelOptUtil.mergeMiniBatchInterval(
-              originTrait.getMiniBatchInterval,
-              updatedTrait.getMiniBatchInterval)
-            val inferredTrait = new 
MiniBatchIntervalTrait(mergedMiniBatchInterval)
-            input.copy(input.getTraitSet.plus(inferredTrait), input.getInputs)
-          } else {
-            input
-          }
-        }
-    }
-    // update parent if a child was updated
-    if (inputs != updatedInputs) {
-      val newRel = rel.copy(rel.getTraitSet, updatedInputs)
-      call.transformTo(newRel)
-    }
-  }
-
-  private def shouldAppendMiniBatchAssignerNode(node: RelNode): Boolean = {
-    val mode = node.getTraitSet
-      .getTrait(MiniBatchIntervalTraitDef.INSTANCE)
-      .getMiniBatchInterval
-      .getMode
-    node match {
-      case _: StreamPhysicalDataStreamScan | _: 
StreamPhysicalLegacyTableSourceScan |
-          _: StreamPhysicalTableSourceScan =>
-        // append minibatch node if the mode is not NONE and reach a source 
leaf node
-        mode == MiniBatchMode.RowTime || mode == MiniBatchMode.ProcTime
-      case _: StreamPhysicalWatermarkAssigner =>
-        // append minibatch node if it is rowtime mode and the child is 
watermark assigner
-        // TODO: if it is ProcTime mode, we also append a minibatch node for 
now.
-        //  Because the downstream can be a regular aggregate and the 
watermark assigner
-        //  might be redundant. In FLINK-14621, we will remove redundant 
watermark assigner,
-        //  then we can remove the ProcTime condition.
-        mode == MiniBatchMode.RowTime || mode == MiniBatchMode.ProcTime
-      case _ =>
-        // others do not append minibatch node
-        false
-    }
-  }
-}
-
-object MiniBatchIntervalInferRule {
-  val INSTANCE: RelOptRule = new MiniBatchIntervalInferRule
-}

Reply via email to