[ 
https://issues.apache.org/jira/browse/FLINK-38379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18021150#comment-18021150
 ] 

Gustavo de Morais commented on FLINK-38379:
-------------------------------------------

This can be reproduced and will be fixed for this nexmark benchmark query


{code:java}
public static final TableTestProgram MULTI_JOIN_NEXMARK_AUCTION_BID_AGGREGATION 
=
TableTestProgram.of(
"nexmark-auction-bid-aggregation",
"Nexmark auction and bid aggregation with time-based filtering")
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
.setupTableSource(
SourceTestStep.newBuilder("auctions")
.addSchema(
"id BIGINT PRIMARY KEY NOT ENFORCED",
"category BIGINT",
"auctionTimestamp STRING",
"expiresTimestamp STRING",
"auctionDateTime AS TO_TIMESTAMP(auctionTimestamp)",
"expires AS TO_TIMESTAMP(expiresTimestamp)",
"WATERMARK FOR auctionDateTime AS auctionDateTime - INTERVAL '1' SECOND")
.addOption("changelog-mode", "I")
.producedValues(
Row.ofKind(
RowKind.INSERT,
1L,
1L,
"2024-01-01 12:00:00",
"2024-01-01 12:30:00"),
Row.ofKind(
RowKind.INSERT,
2L,
2L,
"2024-01-01 12:05:00",
"2024-01-01 12:35:00"))
.build())
.setupTableSource(
SourceTestStep.newBuilder("bids")
.addSchema(
"auction BIGINT",
"price DOUBLE",
"bidTimestamp STRING",
"bidDateTime AS TO_TIMESTAMP(bidTimestamp)",
"WATERMARK FOR bidDateTime AS bidDateTime - INTERVAL '1' SECOND")
.addOption("changelog-mode", "I")
.producedValues(
Row.ofKind(
RowKind.INSERT,
1L,
12.0,
"2024-01-01 12:15:00"),
Row.ofKind(
RowKind.INSERT,
1L,
15.0,
"2024-01-01 12:20:00"),
Row.ofKind(
RowKind.INSERT,
2L,
25.0,
"2024-01-01 12:25:00"),
Row.ofKind(
RowKind.INSERT,
1L,
20.0,
"2024-01-01 12:45:00"))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema("category BIGINT", "avg_final DOUBLE")
.consumedValues(
"+I[1, 15.0]", // Category 1: MAX(15.0, 12.0) = 15
"+I[2, 25.0]") // Category 2: 25.0 / 1 = 25.0
.testMaterializedData()
.build())
.runSql(
"INSERT INTO sink "
+ "SELECT Q.category, AVG(Q.final) "
+ "FROM ( "
+ " SELECT MAX(B.price) AS final, A.category "
+ " FROM auctions A, bids B "
+ " WHERE A.id = B.auction AND B.bidDateTime BETWEEN A.auctionDateTime AND 
A.expires "
+ " GROUP BY A.id, A.category "
+ ") Q "
+ "GROUP BY Q.category")
.build();
}{code}

> Adjust visitMultiJoin in RelTimeIndicatorConverter to use fields and not rows
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-38379
>                 URL: https://issues.apache.org/jira/browse/FLINK-38379
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0
>            Reporter: Gustavo de Morais
>            Assignee: Gustavo de Morais
>            Priority: Major
>             Fix For: 2.2.0
>
>
> There's an issue when materializing time attributes for MultiJoin: we were 
> using the row type instead of the combined list of joined field types. The 
> solution is simple: we need to init the materializer with a list of all 
> fields from all inputs, not just the rows from each one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to