This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1c71c3fabbe [FLINK-39420] Reject temporal joins in batch mode with a
clear error
1c71c3fabbe is described below
commit 1c71c3fabbe1c753634da890ec74f5ab17d7c13a
Author: Jim Hughes <[email protected]>
AuthorDate: Thu Apr 16 14:02:25 2026 -0400
[FLINK-39420] Reject temporal joins in batch mode with a clear error
Batch mode only supports lookup joins. General temporal joins
(event-time or processing-time on non-lookup sources) previously
left an unresolved LogicalCorrelate in the plan, causing a confusing
"unexpected correlate variable" error from FlinkDecorrelateProgram.
Add RejectTemporalJoinInBatchRule to the batch EXPAND_PLAN_RULES.
The lookup join rules fire first and rewrite valid lookup joins;
any remaining Correlate+Snapshot pattern is unconditionally rejected
with an actionable message pointing users to lookup joins or
streaming mode.
Generated-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../logical/RejectTemporalJoinInBatchRule.java | 113 +++++++++++++++++++++
.../planner/plan/rules/FlinkBatchRuleSets.scala | 8 +-
.../plan/batch/sql/join/TemporalJoinTest.scala | 25 ++++-
3 files changed, 142 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RejectTemporalJoinInBatchRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RejectTemporalJoinInBatchRule.java
new file mode 100644
index 00000000000..b7411840c67
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RejectTemporalJoinInBatchRule.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
+import org.immutables.value.Value;
+
+/**
+ * Rules that reject temporal table joins ({@code FOR SYSTEM_TIME AS OF}) in
batch mode with a clear
+ * error message.
+ *
+ * <p>In the batch {@code EXPAND_PLAN_RULES}, lookup join rules run first and
rewrite valid lookup
+ * joins (processing-time + {@code LookupTableSource}) into {@code
TemporalJoin} nodes. Any
+ * remaining {@link LogicalCorrelate} + {@link LogicalSnapshot} pattern
therefore represents an
+ * unsupported temporal join. These rules catch it and throw a {@link
TableException} rather than
+ * letting the correlate survive into {@code FlinkDecorrelateProgram}, where
it would cause a
+ * confusing "unexpected correlate variable" internal error.
+ */
[email protected]
+public class RejectTemporalJoinInBatchRule
+ extends
RelRule<RejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig> {
+
+ private static final String MESSAGE =
+ "Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not
supported in "
+ + "batch mode. Use a lookup join or switch to streaming
mode.";
+
+ /**
+ * Matches temporal joins where the right side of the Correlate is a
Filter wrapping a Snapshot
+ * (non-trivial join condition).
+ */
+ public static final RejectTemporalJoinInBatchRule WITH_FILTER =
+ RejectTemporalJoinInBatchRuleConfig.WITH_FILTER.toRule();
+
+ /**
+ * Matches temporal joins where the right side of the Correlate is a
Snapshot directly (trivial
+ * join condition).
+ */
+ public static final RejectTemporalJoinInBatchRule WITHOUT_FILTER =
+ RejectTemporalJoinInBatchRuleConfig.WITHOUT_FILTER.toRule();
+
+ private RejectTemporalJoinInBatchRule(RejectTemporalJoinInBatchRuleConfig
config) {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ throw new TableException(MESSAGE);
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable(singleton = false)
+ public interface RejectTemporalJoinInBatchRuleConfig extends
RelRule.Config {
+
+ RejectTemporalJoinInBatchRuleConfig WITH_FILTER =
+
ImmutableRejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig.builder()
+ .operandSupplier(
+ b0 ->
+ b0.operand(LogicalCorrelate.class)
+ .inputs(
+ b1 ->
b1.operand(RelNode.class).anyInputs(),
+ b2 ->
+
b2.operand(LogicalFilter.class)
+
.oneInput(
+
b3 ->
+
b3.operand(
+
LogicalSnapshot
+
.class)
+
.anyInputs())))
+
.description("RejectTemporalJoinInBatchRule_WithFilter")
+ .build();
+
+ RejectTemporalJoinInBatchRuleConfig WITHOUT_FILTER =
+
ImmutableRejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig.builder()
+ .operandSupplier(
+ b0 ->
+ b0.operand(LogicalCorrelate.class)
+ .inputs(
+ b1 ->
b1.operand(RelNode.class).anyInputs(),
+ b2 ->
+
b2.operand(LogicalSnapshot.class)
+
.anyInputs()))
+
.description("RejectTemporalJoinInBatchRule_WithoutFilter")
+ .build();
+
+ @Override
+ default RejectTemporalJoinInBatchRule toRule() {
+ return new RejectTemporalJoinInBatchRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 6cd452810e6..7ff8a16a70e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -50,10 +50,16 @@ object FlinkBatchRuleSets {
/**
* Expand plan by replacing references to tables into a proper plan sub
trees. Those rules can
* create new plan nodes.
+ *
+ * The lookup join rules run first and rewrite supported temporal joins. The
rejection rules then
+ * catch any remaining temporal joins (unsupported in batch) with a clear
error message.
*/
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITH_FILTER,
- LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER)
+ LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER,
+ RejectTemporalJoinInBatchRule.WITH_FILTER,
+ RejectTemporalJoinInBatchRule.WITHOUT_FILTER
+ )
val POST_EXPAND_CLEAN_UP_RULES: RuleSet =
RuleSets.ofList(EnumerableToLogicalTableScan.INSTANCE)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
index 89292aea47a..dd65b2bfc81 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala
@@ -24,9 +24,11 @@ import
org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.jupiter.api.{BeforeEach, Test}
/**
- * Test temporal join in batch mode.
+ * Tests that temporal joins on non-lookup tables are rejected in batch mode
with a clear error.
*
- * <p> Flink only supports lookup join in batch mode, the others Temporal join
is not supported yet.
+ * <p>Batch mode only supports lookup joins. General temporal joins
(event-time or processing-time
+ * on non-lookup sources) are caught by [[RejectTemporalJoinInBatchRule]]
during the
+ * TEMPORAL_JOIN_REWRITE phase.
*/
class TemporalJoinTest extends TableTestBase {
@@ -105,6 +107,10 @@ class TemporalJoinTest extends TableTestBase {
"GROUP BY currency ")
}
+ private val expectedMessage =
+ "Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not
supported in " +
+ "batch mode. Use a lookup join or switch to streaming mode."
+
@Test
def testSimpleJoin(): Unit = {
val sqlQuery = "SELECT " +
@@ -115,6 +121,7 @@ class TemporalJoinTest extends TableTestBase {
assertThatExceptionOfType(classOf[TableException])
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
+ .withMessage(expectedMessage)
}
@Test
@@ -128,6 +135,7 @@ class TemporalJoinTest extends TableTestBase {
assertThatExceptionOfType(classOf[TableException])
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
+ .withMessage(expectedMessage)
}
@Test
@@ -141,11 +149,21 @@ class TemporalJoinTest extends TableTestBase {
assertThatExceptionOfType(classOf[TableException])
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
+ .withMessage(expectedMessage)
}
@Test
- def testSimpleViewProcTimeJoin(): Unit = {
+ def testSimpleJoinOnTrue(): Unit = {
+ val sqlQuery = "SELECT o_amount FROM Orders AS o JOIN " +
+ "RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.o_rowtime AS r ON TRUE"
+
+ assertThatExceptionOfType(classOf[TableException])
+ .isThrownBy(() => util.verifyExecPlan(sqlQuery))
+ .withMessage(expectedMessage)
+ }
+ @Test
+ def testSimpleViewProcTimeJoin(): Unit = {
val sqlQuery = "SELECT " +
"o_amount * rate as rate " +
"FROM Orders AS o JOIN " +
@@ -155,5 +173,6 @@ class TemporalJoinTest extends TableTestBase {
assertThatExceptionOfType(classOf[TableException])
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
+ .withMessage(expectedMessage)
}
}