This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e0a9d904074055c1e5ca9089d5dfda3f371def0 Author: Gustavo de Morais <[email protected]> AuthorDate: Fri Sep 19 15:33:41 2025 +0100 [FLINK-38379][table] Adjust `visitMultiJoin` in `RelTimeIndicatorConverter` to use fields and not rows --- .claude/CLAUDE.local.md | 8 ++ .../planner/calcite/RelTimeIndicatorConverter.java | 4 +- .../nodes/exec/stream/MultiJoinSemanticTests.java | 4 +- .../nodes/exec/stream/MultiJoinTestPrograms.java | 97 ++++++++++++++++++++++ 4 files changed, 111 insertions(+), 2 deletions(-) diff --git a/.claude/CLAUDE.local.md b/.claude/CLAUDE.local.md new file mode 100644 index 00000000000..8d9ff2b2c09 --- /dev/null +++ b/.claude/CLAUDE.local.md @@ -0,0 +1,8 @@ +- You're a respected open source apache flink maintainer. + +- Test your changes with one appropriate existing test using this format just to check if we have no compilation errors "cd /Users/gdemorais/qdev/flink2 && ./mvnw test -Dtest="MultiJoinTest#testTwoWayJoinWithUnion" -pl flink-table/flink-table-planner -q -Dcheckstyle.skip -Drat.skip -Dscalastyle.skip -Denforcer.skip=true -Pgenerate-config-docs -Dspotless.check.skip=true" +- If you cannot run your tests because they get stuck downloading dependencies. You can run "./mvnw clean install -T1C -DskipTests -Pfast -Dcheckstyle.skip -Drat.skip -Dscalastyle.skip -Denforcer.skip=true -Pgenerate-config-docs -Dspotless.check.skip=true -DskipITs=true -Dmaven.javadoc.skip=true -Djapicmp.skip=true -Pskip-webui-build -T4 -fn" to build first. Just do this once and if necessary because it takes over 5 minutes! +- If you have to install only one module, also use all the available flags to speed up the process. E.g. "./mvnw install -pl flink-table/flink-table-common -T1C -DskipTests -Pfast -Dcheckstyle.skip -Drat.skip -Dscalastyle.skip -Denforcer.skip=true -Pgenerate-config-docs -Dspotless.check.skip=true -DskipITs=true -Dmaven.javadoc.skip=true -Djapicmp.skip=true -Pskip-webui-build -T4" + +- We always want to keep the things we do in "eval" for functions and in the processRecord methods in operators to a minimum since this is a hot path for flink which is executed millions of times. Always do what you can in the constructor, intialize things else where. + diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java index cffe954de32..305296e5aff 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java @@ -580,7 +580,9 @@ public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle { .collect(Collectors.toList()); final List<RelDataType> allFields = - newInputs.stream().map(RelNode::getRowType).collect(Collectors.toList()); + newInputs.stream() + .flatMap(input -> RelOptUtil.getFieldTypeList(input.getRowType()).stream()) + .collect(Collectors.toList()); RexTimeIndicatorMaterializer materializer = new RexTimeIndicatorMaterializer(allFields); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java index 42db5806102..fb24e92e1d7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java @@ -40,6 +40,8 @@ public class MultiJoinSemanticTests extends SemanticTestBase { MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE, MultiJoinTestPrograms.MULTI_JOIN_LEFT_OUTER_WITH_NULL_KEYS, MultiJoinTestPrograms.MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS, - MultiJoinTestPrograms.MULTI_JOIN_MIXED_CHANGELOG_MODES); + MultiJoinTestPrograms.MULTI_JOIN_MIXED_CHANGELOG_MODES, + MultiJoinTestPrograms + .MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java index ffc47c62a20..3e8e27e9c30 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java @@ -1102,4 +1102,101 @@ public class MultiJoinTestPrograms { + "LEFT JOIN RetractTable r ON a.id = r.ref_id " + "LEFT JOIN UpsertTable u ON a.id = u.key_id") .build(); + + public static final TableTestProgram + MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION = + TableTestProgram.of( + "three-way-join-with-time-attributes-in-join-conditions", + "A query from the nexmark benchmark: " + + "auction and bid aggregation with time-based filtering") + .setupTableSource( + SourceTestStep.newBuilder("auctions") + .addSchema( + "id BIGINT PRIMARY KEY NOT ENFORCED", + "category STRING", + "auctionTimestamp STRING", + "expiresTimestamp STRING", + "auctionDateTime AS TO_TIMESTAMP(auctionTimestamp)", + "expires AS TO_TIMESTAMP(expiresTimestamp)", + "WATERMARK FOR auctionDateTime AS auctionDateTime - INTERVAL '1' SECOND") + .addOption("changelog-mode", "I") + .producedValues( + Row.ofKind( + RowKind.INSERT, + 1L, + "Electronics", + "2024-01-01 12:00:00", + "2024-01-01 12:30:00"), + Row.ofKind( + RowKind.INSERT, + 3L, + "Electronics", + "2024-01-01 12:10:00", + "2024-01-01 12:40:00"), + Row.ofKind( + RowKind.INSERT, + 2L, + "Books", + "2024-01-01 12:05:00", + "2024-01-01 12:35:00")) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("bids") + .addSchema( + "auction BIGINT", + "price DOUBLE", + "bidTimestamp STRING", + "bidDateTime AS TO_TIMESTAMP(bidTimestamp)", + "WATERMARK FOR bidDateTime AS bidDateTime - INTERVAL '1' SECOND") + .addOption("changelog-mode", "I") + .producedValues( + Row.ofKind( + RowKind.INSERT, + 1L, + 12.0, + "2024-01-01 12:15:00"), + Row.ofKind( + RowKind.INSERT, + 1L, + 15.0, + "2024-01-01 12:20:00"), + Row.ofKind( + RowKind.INSERT, + 2L, + 25.0, + "2024-01-01 12:25:00"), + Row.ofKind( + RowKind.INSERT, + 3L, + 18.0, + "2024-01-01 12:30:00"), + Row.ofKind( + RowKind.INSERT, + 1L, + 20.0, + "2024-01-01 12:45:00")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("category STRING", "avg_final DOUBLE") + .consumedValues( + // Electronics: AVG(MAX(12.0, 15.0), MAX(18.0)) + // = + // AVG(15.0, 18.0) = 16.5 + "+I[Electronics, 16.5]", + // Books: MAX(25.0) = 25.0, AVG(25.0) = 25.0 + "+I[Books, 25.0]") + .testMaterializedData() + .build()) + .runSql( + "INSERT INTO sink " + + "SELECT Q.category, AVG(Q.final) " + + "FROM ( " + + " SELECT MAX(B.price) AS final, A.category " + + " FROM auctions A, bids B " + + " WHERE A.id = B.auction AND B.bidDateTime BETWEEN A.auctionDateTime AND A.expires " + + " GROUP BY A.id, A.category " + + ") Q " + + "GROUP BY Q.category") + .build(); }
