[
https://issues.apache.org/jira/browse/FLINK-38379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18021150#comment-18021150
]
Gustavo de Morais commented on FLINK-38379:
-------------------------------------------
This can be reproduced and will be fixed for this nexmark benchmark query
{code:java}
public static final TableTestProgram MULTI_JOIN_NEXMARK_AUCTION_BID_AGGREGATION
=
TableTestProgram.of(
"nexmark-auction-bid-aggregation",
"Nexmark auction and bid aggregation with time-based filtering")
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
.setupTableSource(
SourceTestStep.newBuilder("auctions")
.addSchema(
"id BIGINT PRIMARY KEY NOT ENFORCED",
"category BIGINT",
"auctionTimestamp STRING",
"expiresTimestamp STRING",
"auctionDateTime AS TO_TIMESTAMP(auctionTimestamp)",
"expires AS TO_TIMESTAMP(expiresTimestamp)",
"WATERMARK FOR auctionDateTime AS auctionDateTime - INTERVAL '1' SECOND")
.addOption("changelog-mode", "I")
.producedValues(
Row.ofKind(
RowKind.INSERT,
1L,
1L,
"2024-01-01 12:00:00",
"2024-01-01 12:30:00"),
Row.ofKind(
RowKind.INSERT,
2L,
2L,
"2024-01-01 12:05:00",
"2024-01-01 12:35:00"))
.build())
.setupTableSource(
SourceTestStep.newBuilder("bids")
.addSchema(
"auction BIGINT",
"price DOUBLE",
"bidTimestamp STRING",
"bidDateTime AS TO_TIMESTAMP(bidTimestamp)",
"WATERMARK FOR bidDateTime AS bidDateTime - INTERVAL '1' SECOND")
.addOption("changelog-mode", "I")
.producedValues(
Row.ofKind(
RowKind.INSERT,
1L,
12.0,
"2024-01-01 12:15:00"),
Row.ofKind(
RowKind.INSERT,
1L,
15.0,
"2024-01-01 12:20:00"),
Row.ofKind(
RowKind.INSERT,
2L,
25.0,
"2024-01-01 12:25:00"),
Row.ofKind(
RowKind.INSERT,
1L,
20.0,
"2024-01-01 12:45:00"))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema("category BIGINT", "avg_final DOUBLE")
.consumedValues(
"+I[1, 15.0]", // Category 1: MAX(15.0, 12.0) = 15
"+I[2, 25.0]") // Category 2: 25.0 / 1 = 25.0
.testMaterializedData()
.build())
.runSql(
"INSERT INTO sink "
+ "SELECT Q.category, AVG(Q.final) "
+ "FROM ( "
+ " SELECT MAX(B.price) AS final, A.category "
+ " FROM auctions A, bids B "
+ " WHERE A.id = B.auction AND B.bidDateTime BETWEEN A.auctionDateTime AND
A.expires "
+ " GROUP BY A.id, A.category "
+ ") Q "
+ "GROUP BY Q.category")
.build();
}{code}
> Adjust visitMultiJoin in RelTimeIndicatorConverter to use fields and not rows
> -----------------------------------------------------------------------------
>
> Key: FLINK-38379
> URL: https://issues.apache.org/jira/browse/FLINK-38379
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.1.0
> Reporter: Gustavo de Morais
> Assignee: Gustavo de Morais
> Priority: Major
> Fix For: 2.2.0
>
>
> There's an issue when materializing time attributes for MultiJoin: we were
> using the row type instead of the combined list of joined field types. The
> solution is simple: we need to init the materializer with a list of all
> fields from all inputs, not just the rows from each one.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)