gustavodemorais commented on code in PR #27005:
URL: https://github.com/apache/flink/pull/27005#discussion_r2361307005


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java:
##########
@@ -1144,4 +1144,98 @@ public class MultiJoinTestPrograms {
                                     + "FROM UsersNullSafe u "
                                     + "INNER JOIN OrdersNullSafe o ON 
u.user_id IS NOT DISTINCT FROM o.user_id")
                     .build();
+
+    public static final TableTestProgram 
MULTI_JOIN_CONDITIONS_MATERIALIZATION_NEXMARK_AGGREGATION =
+            TableTestProgram.of(
+                            "nexmark-auction-bid-aggregation",
+                            "Nexmark auction and bid aggregation with 
time-based filtering")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("auctions")
+                                    .addSchema(
+                                            "id BIGINT PRIMARY KEY NOT 
ENFORCED",
+                                            "category STRING",
+                                            "auctionTimestamp STRING",
+                                            "expiresTimestamp STRING",
+                                            "auctionDateTime AS 
TO_TIMESTAMP(auctionTimestamp)",
+                                            "expires AS 
TO_TIMESTAMP(expiresTimestamp)",
+                                            "WATERMARK FOR auctionDateTime AS 
auctionDateTime - INTERVAL '1' SECOND")
+                                    .addOption("changelog-mode", "I")
+                                    .producedValues(
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1L,
+                                                    "Electronics",
+                                                    "2024-01-01 12:00:00",
+                                                    "2024-01-01 12:30:00"),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    3L,
+                                                    "Electronics",
+                                                    "2024-01-01 12:10:00",
+                                                    "2024-01-01 12:40:00"),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    2L,
+                                                    "Books",
+                                                    "2024-01-01 12:05:00",
+                                                    "2024-01-01 12:35:00"))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("bids")
+                                    .addSchema(
+                                            "auction BIGINT",
+                                            "price DOUBLE",
+                                            "bidTimestamp STRING",
+                                            "bidDateTime AS 
TO_TIMESTAMP(bidTimestamp)",
+                                            "WATERMARK FOR bidDateTime AS 
bidDateTime - INTERVAL '1' SECOND")
+                                    .addOption("changelog-mode", "I")
+                                    .producedValues(
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1L,
+                                                    12.0,
+                                                    "2024-01-01 12:15:00"),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1L,
+                                                    15.0,
+                                                    "2024-01-01 12:20:00"),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    2L,
+                                                    25.0,
+                                                    "2024-01-01 12:25:00"),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    3L,
+                                                    18.0,
+                                                    "2024-01-01 12:30:00"),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1L,
+                                                    20.0,
+                                                    "2024-01-01 12:45:00"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("category STRING", "avg_final 
DOUBLE")
+                                    .consumedValues(
+                                            // Electronics: AVG(MAX(12.0, 
15.0), MAX(18.0)) =

Review Comment:
   They explain the aggregation results for each category



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to