This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c71c3fabbe [FLINK-39420] Reject temporal joins in batch mode with a 
clear error
1c71c3fabbe is described below

commit 1c71c3fabbe1c753634da890ec74f5ab17d7c13a
Author: Jim Hughes <[email protected]>
AuthorDate: Thu Apr 16 14:02:25 2026 -0400

    [FLINK-39420] Reject temporal joins in batch mode with a clear error
    
    Batch mode only supports lookup joins. General temporal joins
    (event-time or processing-time on non-lookup sources) previously
    left an unresolved LogicalCorrelate in the plan, causing a confusing
    "unexpected correlate variable" error from FlinkDecorrelateProgram.
    
    Add RejectTemporalJoinInBatchRule to the batch EXPAND_PLAN_RULES.
    The lookup join rules fire first and rewrite valid lookup joins;
    any remaining Correlate+Snapshot pattern is unconditionally rejected
    with an actionable message pointing users to lookup joins or
    streaming mode.
    
    Generated-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../logical/RejectTemporalJoinInBatchRule.java     | 113 +++++++++++++++++++++
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   8 +-
 .../plan/batch/sql/join/TemporalJoinTest.scala     |  25 ++++-
 3 files changed, 142 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RejectTemporalJoinInBatchRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RejectTemporalJoinInBatchRule.java
new file mode 100644
index 00000000000..b7411840c67
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RejectTemporalJoinInBatchRule.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
+import org.immutables.value.Value;
+
+/**
+ * Rules that reject temporal table joins ({@code FOR SYSTEM_TIME AS OF}) in 
batch mode with a clear
+ * error message.
+ *
+ * <p>In the batch {@code EXPAND_PLAN_RULES}, lookup join rules run first and 
rewrite valid lookup
+ * joins (processing-time + {@code LookupTableSource}) into {@code 
TemporalJoin} nodes. Any
+ * remaining {@link LogicalCorrelate} + {@link LogicalSnapshot} pattern 
therefore represents an
+ * unsupported temporal join. These rules catch it and throw a {@link 
TableException} rather than
+ * letting the correlate survive into {@code FlinkDecorrelateProgram}, where 
it would cause a
+ * confusing "unexpected correlate variable" internal error.
+ */
[email protected]
+public class RejectTemporalJoinInBatchRule
+        extends 
RelRule<RejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig> {
+
+    private static final String MESSAGE =
+            "Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not 
supported in "
+                    + "batch mode. Use a lookup join or switch to streaming 
mode.";
+
+    /**
+     * Matches temporal joins where the right side of the Correlate is a 
Filter wrapping a Snapshot
+     * (non-trivial join condition).
+     */
+    public static final RejectTemporalJoinInBatchRule WITH_FILTER =
+            RejectTemporalJoinInBatchRuleConfig.WITH_FILTER.toRule();
+
+    /**
+     * Matches temporal joins where the right side of the Correlate is a 
Snapshot directly (trivial
+     * join condition).
+     */
+    public static final RejectTemporalJoinInBatchRule WITHOUT_FILTER =
+            RejectTemporalJoinInBatchRuleConfig.WITHOUT_FILTER.toRule();
+
+    private RejectTemporalJoinInBatchRule(RejectTemporalJoinInBatchRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        throw new TableException(MESSAGE);
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface RejectTemporalJoinInBatchRuleConfig extends 
RelRule.Config {
+
+        RejectTemporalJoinInBatchRuleConfig WITH_FILTER =
+                
ImmutableRejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig.builder()
+                        .operandSupplier(
+                                b0 ->
+                                        b0.operand(LogicalCorrelate.class)
+                                                .inputs(
+                                                        b1 -> 
b1.operand(RelNode.class).anyInputs(),
+                                                        b2 ->
+                                                                
b2.operand(LogicalFilter.class)
+                                                                        
.oneInput(
+                                                                               
 b3 ->
+                                                                               
         b3.operand(
+                                                                               
                         LogicalSnapshot
+                                                                               
                                 .class)
+                                                                               
                 .anyInputs())))
+                        
.description("RejectTemporalJoinInBatchRule_WithFilter")
+                        .build();
+
+        RejectTemporalJoinInBatchRuleConfig WITHOUT_FILTER =
+                
ImmutableRejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig.builder()
+                        .operandSupplier(
+                                b0 ->
+                                        b0.operand(LogicalCorrelate.class)
+                                                .inputs(
+                                                        b1 -> 
b1.operand(RelNode.class).anyInputs(),
+                                                        b2 ->
+                                                                
b2.operand(LogicalSnapshot.class)
+                                                                        
.anyInputs()))
+                        
.description("RejectTemporalJoinInBatchRule_WithoutFilter")
+                        .build();
+
+        @Override
+        default RejectTemporalJoinInBatchRule toRule() {
+            return new RejectTemporalJoinInBatchRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 6cd452810e6..7ff8a16a70e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -50,10 +50,16 @@ object FlinkBatchRuleSets {
   /**
    * Expand plan by replacing references to tables into a proper plan sub 
trees. Those rules can
    * create new plan nodes.
+   *
+   * The lookup join rules run first and rewrite supported temporal joins. The 
rejection rules then
+   * catch any remaining temporal joins (unsupported in batch) with a clear 
error message.
    */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
     LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITH_FILTER,
-    LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER)
+    LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER,
+    RejectTemporalJoinInBatchRule.WITH_FILTER,
+    RejectTemporalJoinInBatchRule.WITHOUT_FILTER
+  )
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = 
RuleSets.ofList(EnumerableToLogicalTableScan.INSTANCE)
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
index 89292aea47a..dd65b2bfc81 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
@@ -24,9 +24,11 @@ import 
org.assertj.core.api.Assertions.assertThatExceptionOfType
 import org.junit.jupiter.api.{BeforeEach, Test}
 
 /**
- * Test temporal join in batch mode.
+ * Tests that temporal joins on non-lookup tables are rejected in batch mode 
with a clear error.
  *
- * <p> Flink only supports lookup join in batch mode, the others Temporal join 
is not supported yet.
+ * <p>Batch mode only supports lookup joins. General temporal joins 
(event-time or processing-time
+ * on non-lookup sources) are caught by [[RejectTemporalJoinInBatchRule]] 
during the
+ * TEMPORAL_JOIN_REWRITE phase.
  */
 class TemporalJoinTest extends TableTestBase {
 
@@ -105,6 +107,10 @@ class TemporalJoinTest extends TableTestBase {
         "GROUP BY currency ")
   }
 
+  private val expectedMessage =
+    "Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not 
supported in " +
+      "batch mode. Use a lookup join or switch to streaming mode."
+
   @Test
   def testSimpleJoin(): Unit = {
     val sqlQuery = "SELECT " +
@@ -115,6 +121,7 @@ class TemporalJoinTest extends TableTestBase {
 
     assertThatExceptionOfType(classOf[TableException])
       .isThrownBy(() => util.verifyExecPlan(sqlQuery))
+      .withMessage(expectedMessage)
   }
 
   @Test
@@ -128,6 +135,7 @@ class TemporalJoinTest extends TableTestBase {
 
     assertThatExceptionOfType(classOf[TableException])
       .isThrownBy(() => util.verifyExecPlan(sqlQuery))
+      .withMessage(expectedMessage)
   }
 
   @Test
@@ -141,11 +149,21 @@ class TemporalJoinTest extends TableTestBase {
 
     assertThatExceptionOfType(classOf[TableException])
       .isThrownBy(() => util.verifyExecPlan(sqlQuery))
+      .withMessage(expectedMessage)
   }
 
   @Test
-  def testSimpleViewProcTimeJoin(): Unit = {
+  def testSimpleJoinOnTrue(): Unit = {
+    val sqlQuery = "SELECT o_amount FROM Orders AS o JOIN " +
+      "RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.o_rowtime AS r ON TRUE"
+
+    assertThatExceptionOfType(classOf[TableException])
+      .isThrownBy(() => util.verifyExecPlan(sqlQuery))
+      .withMessage(expectedMessage)
+  }
 
+  @Test
+  def testSimpleViewProcTimeJoin(): Unit = {
     val sqlQuery = "SELECT " +
       "o_amount * rate as rate " +
       "FROM Orders AS o JOIN " +
@@ -155,5 +173,6 @@ class TemporalJoinTest extends TableTestBase {
 
     assertThatExceptionOfType(classOf[TableException])
       .isThrownBy(() => util.verifyExecPlan(sqlQuery))
+      .withMessage(expectedMessage)
   }
 }

Reply via email to