gustavodemorais commented on code in PR #27005: URL: https://github.com/apache/flink/pull/27005#discussion_r2361307005
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java:
##########
@@ -1144,4 +1144,98 @@ public class MultiJoinTestPrograms {
+ "FROM UsersNullSafe u "
+ "INNER JOIN OrdersNullSafe o ON
u.user_id IS NOT DISTINCT FROM o.user_id")
.build();
+
+ public static final TableTestProgram
MULTI_JOIN_CONDITIONS_MATERIALIZATION_NEXMARK_AGGREGATION =
+ TableTestProgram.of(
+ "nexmark-auction-bid-aggregation",
+ "Nexmark auction and bid aggregation with
time-based filtering")
+ .setupTableSource(
+ SourceTestStep.newBuilder("auctions")
+ .addSchema(
+ "id BIGINT PRIMARY KEY NOT
ENFORCED",
+ "category STRING",
+ "auctionTimestamp STRING",
+ "expiresTimestamp STRING",
+ "auctionDateTime AS
TO_TIMESTAMP(auctionTimestamp)",
+ "expires AS
TO_TIMESTAMP(expiresTimestamp)",
+ "WATERMARK FOR auctionDateTime AS
auctionDateTime - INTERVAL '1' SECOND")
+ .addOption("changelog-mode", "I")
+ .producedValues(
+ Row.ofKind(
+ RowKind.INSERT,
+ 1L,
+ "Electronics",
+ "2024-01-01 12:00:00",
+ "2024-01-01 12:30:00"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 3L,
+ "Electronics",
+ "2024-01-01 12:10:00",
+ "2024-01-01 12:40:00"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 2L,
+ "Books",
+ "2024-01-01 12:05:00",
+ "2024-01-01 12:35:00"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("bids")
+ .addSchema(
+ "auction BIGINT",
+ "price DOUBLE",
+ "bidTimestamp STRING",
+ "bidDateTime AS
TO_TIMESTAMP(bidTimestamp)",
+ "WATERMARK FOR bidDateTime AS
bidDateTime - INTERVAL '1' SECOND")
+ .addOption("changelog-mode", "I")
+ .producedValues(
+ Row.ofKind(
+ RowKind.INSERT,
+ 1L,
+ 12.0,
+ "2024-01-01 12:15:00"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 1L,
+ 15.0,
+ "2024-01-01 12:20:00"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 2L,
+ 25.0,
+ "2024-01-01 12:25:00"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 3L,
+ 18.0,
+ "2024-01-01 12:30:00"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 1L,
+ 20.0,
+ "2024-01-01 12:45:00"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("category STRING", "avg_final
DOUBLE")
+ .consumedValues(
+ // Electronics: AVG(MAX(12.0,
15.0), MAX(18.0)) =
Review Comment:
They explain the aggregation results for each category
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
