This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e0a9d904074055c1e5ca9089d5dfda3f371def0
Author: Gustavo de Morais <[email protected]>
AuthorDate: Fri Sep 19 15:33:41 2025 +0100

    [FLINK-38379][table] Adjust `visitMultiJoin` in `RelTimeIndicatorConverter` 
to use fields and not rows
---
 .claude/CLAUDE.local.md                            |  8 ++
 .../planner/calcite/RelTimeIndicatorConverter.java |  4 +-
 .../nodes/exec/stream/MultiJoinSemanticTests.java  |  4 +-
 .../nodes/exec/stream/MultiJoinTestPrograms.java   | 97 ++++++++++++++++++++++
 4 files changed, 111 insertions(+), 2 deletions(-)

diff --git a/.claude/CLAUDE.local.md b/.claude/CLAUDE.local.md
new file mode 100644
index 00000000000..8d9ff2b2c09
--- /dev/null
+++ b/.claude/CLAUDE.local.md
@@ -0,0 +1,8 @@
+- You're a respected open source apache flink maintainer.
+
+- Test your changes with one appropriate existing test using this format just 
to check if we have no compilation errors "cd /Users/gdemorais/qdev/flink2 && 
./mvnw test -Dtest="MultiJoinTest#testTwoWayJoinWithUnion" -pl 
flink-table/flink-table-planner -q -Dcheckstyle.skip -Drat.skip 
-Dscalastyle.skip -Denforcer.skip=true -Pgenerate-config-docs 
-Dspotless.check.skip=true"
+- If you cannot run your tests because they get stuck downloading 
dependencies. You can run "./mvnw clean install -T1C -DskipTests -Pfast 
-Dcheckstyle.skip -Drat.skip -Dscalastyle.skip -Denforcer.skip=true 
-Pgenerate-config-docs -Dspotless.check.skip=true -DskipITs=true 
-Dmaven.javadoc.skip=true -Djapicmp.skip=true -Pskip-webui-build -T4 -fn" to 
build first. Just do this once and if necessary because it takes over 5 minutes!
+- If you have to install only one module, also use all the available flags to 
speed up the process. E.g. "./mvnw install -pl flink-table/flink-table-common 
-T1C -DskipTests -Pfast -Dcheckstyle.skip -Drat.skip -Dscalastyle.skip 
-Denforcer.skip=true -Pgenerate-config-docs -Dspotless.check.skip=true 
-DskipITs=true -Dmaven.javadoc.skip=true -Djapicmp.skip=true -Pskip-webui-build 
-T4"
+
+- We always want to keep the things we do in "eval" for functions and in the 
processRecord methods in operators to a minimum since this is a hot path for 
flink which is executed millions of times. Always do what you can in the 
constructor, intialize things else where. 
+    
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
index cffe954de32..305296e5aff 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
@@ -580,7 +580,9 @@ public final class RelTimeIndicatorConverter extends 
RelHomogeneousShuttle {
                         .collect(Collectors.toList());
 
         final List<RelDataType> allFields =
-                
newInputs.stream().map(RelNode::getRowType).collect(Collectors.toList());
+                newInputs.stream()
+                        .flatMap(input -> 
RelOptUtil.getFieldTypeList(input.getRowType()).stream())
+                        .collect(Collectors.toList());
 
         RexTimeIndicatorMaterializer materializer = new 
RexTimeIndicatorMaterializer(allFields);
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
index 42db5806102..fb24e92e1d7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
@@ -40,6 +40,8 @@ public class MultiJoinSemanticTests extends SemanticTestBase {
                 
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE,
                 MultiJoinTestPrograms.MULTI_JOIN_LEFT_OUTER_WITH_NULL_KEYS,
                 MultiJoinTestPrograms.MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS,
-                MultiJoinTestPrograms.MULTI_JOIN_MIXED_CHANGELOG_MODES);
+                MultiJoinTestPrograms.MULTI_JOIN_MIXED_CHANGELOG_MODES,
+                MultiJoinTestPrograms
+                        
.MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
index ffc47c62a20..3e8e27e9c30 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
@@ -1102,4 +1102,101 @@ public class MultiJoinTestPrograms {
                                     + "LEFT JOIN RetractTable r ON a.id = 
r.ref_id "
                                     + "LEFT JOIN UpsertTable u ON a.id = 
u.key_id")
                     .build();
+
+    public static final TableTestProgram
+            MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION =
+                    TableTestProgram.of(
+                                    
"three-way-join-with-time-attributes-in-join-conditions",
+                                    "A query from the nexmark benchmark: "
+                                            + "auction and bid aggregation 
with time-based filtering")
+                            .setupTableSource(
+                                    SourceTestStep.newBuilder("auctions")
+                                            .addSchema(
+                                                    "id BIGINT PRIMARY KEY NOT 
ENFORCED",
+                                                    "category STRING",
+                                                    "auctionTimestamp STRING",
+                                                    "expiresTimestamp STRING",
+                                                    "auctionDateTime AS 
TO_TIMESTAMP(auctionTimestamp)",
+                                                    "expires AS 
TO_TIMESTAMP(expiresTimestamp)",
+                                                    "WATERMARK FOR 
auctionDateTime AS auctionDateTime - INTERVAL '1' SECOND")
+                                            .addOption("changelog-mode", "I")
+                                            .producedValues(
+                                                    Row.ofKind(
+                                                            RowKind.INSERT,
+                                                            1L,
+                                                            "Electronics",
+                                                            "2024-01-01 
12:00:00",
+                                                            "2024-01-01 
12:30:00"),
+                                                    Row.ofKind(
+                                                            RowKind.INSERT,
+                                                            3L,
+                                                            "Electronics",
+                                                            "2024-01-01 
12:10:00",
+                                                            "2024-01-01 
12:40:00"),
+                                                    Row.ofKind(
+                                                            RowKind.INSERT,
+                                                            2L,
+                                                            "Books",
+                                                            "2024-01-01 
12:05:00",
+                                                            "2024-01-01 
12:35:00"))
+                                            .build())
+                            .setupTableSource(
+                                    SourceTestStep.newBuilder("bids")
+                                            .addSchema(
+                                                    "auction BIGINT",
+                                                    "price DOUBLE",
+                                                    "bidTimestamp STRING",
+                                                    "bidDateTime AS 
TO_TIMESTAMP(bidTimestamp)",
+                                                    "WATERMARK FOR bidDateTime 
AS bidDateTime - INTERVAL '1' SECOND")
+                                            .addOption("changelog-mode", "I")
+                                            .producedValues(
+                                                    Row.ofKind(
+                                                            RowKind.INSERT,
+                                                            1L,
+                                                            12.0,
+                                                            "2024-01-01 
12:15:00"),
+                                                    Row.ofKind(
+                                                            RowKind.INSERT,
+                                                            1L,
+                                                            15.0,
+                                                            "2024-01-01 
12:20:00"),
+                                                    Row.ofKind(
+                                                            RowKind.INSERT,
+                                                            2L,
+                                                            25.0,
+                                                            "2024-01-01 
12:25:00"),
+                                                    Row.ofKind(
+                                                            RowKind.INSERT,
+                                                            3L,
+                                                            18.0,
+                                                            "2024-01-01 
12:30:00"),
+                                                    Row.ofKind(
+                                                            RowKind.INSERT,
+                                                            1L,
+                                                            20.0,
+                                                            "2024-01-01 
12:45:00"))
+                                            .build())
+                            .setupTableSink(
+                                    SinkTestStep.newBuilder("sink")
+                                            .addSchema("category STRING", 
"avg_final DOUBLE")
+                                            .consumedValues(
+                                                    // Electronics: 
AVG(MAX(12.0, 15.0), MAX(18.0))
+                                                    // =
+                                                    // AVG(15.0, 18.0) = 16.5
+                                                    "+I[Electronics, 16.5]",
+                                                    // Books: MAX(25.0) = 
25.0, AVG(25.0) = 25.0
+                                                    "+I[Books, 25.0]")
+                                            .testMaterializedData()
+                                            .build())
+                            .runSql(
+                                    "INSERT INTO sink "
+                                            + "SELECT Q.category, AVG(Q.final) 
"
+                                            + "FROM ( "
+                                            + "    SELECT MAX(B.price) AS 
final, A.category "
+                                            + "    FROM auctions A, bids B "
+                                            + "    WHERE A.id = B.auction AND 
B.bidDateTime BETWEEN A.auctionDateTime AND A.expires "
+                                            + "    GROUP BY A.id, A.category "
+                                            + ") Q "
+                                            + "GROUP BY Q.category")
+                            .build();
 }

Reply via email to