This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 0d3aa1b0a3f46bd25a7daa9652337dc32c5e01f4 Author: JingsongLi <[email protected]> AuthorDate: Fri Jan 24 10:27:52 2025 +0800 [test][flink] Add tests back in PreAggregationITCase which deleted by #4982 --- .../apache/paimon/flink/PreAggregationITCase.java | 135 +++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index 589b26d67a..7b8ce3904e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -1269,6 +1269,141 @@ public class PreAggregationITCase { return Arrays.asList(ordersTable, subordersTable, wideTable, wideAppendTable); } + @Test + public void testUseCase() { + sql( + "INSERT INTO orders VALUES " + + "(1, 'Wang', 'HangZhou')," + + "(2, 'Zhao', 'ChengDu')," + + "(3, 'Liu', 'NanJing')"); + + sql( + "INSERT INTO sub_orders VALUES " + + "(1, 1, '12-20', 'Apple', 8000)," + + "(1, 2, '12-20', 'Tesla', 400000)," + + "(1, 1, '12-21', 'Sangsung', 5000)," + + "(2, 1, '12-20', 'Tea', 40)," + + "(2, 2, '12-20', 'Pot', 60)," + + "(3, 1, '12-25', 'Bat', 15)," + + "(3, 1, '12-26', 'Cup', 30)"); + + sql(widenSql()); + + List<Row> result = + sql("SELECT * FROM order_wide").stream() + .sorted(Comparator.comparingInt(r -> r.getFieldAs(0))) + .collect(Collectors.toList()); + + assertThat( + checkOneRecord( + result.get(0), + 1, + "Wang", + "HangZhou", + Row.of(1, "12-20", "Apple", 8000L), + Row.of(1, "12-21", "Sangsung", 5000L), + Row.of(2, "12-20", "Tesla", 400000L))) + .isTrue(); + assertThat( + checkOneRecord( + result.get(1), + 2, + "Zhao", + "ChengDu", + Row.of(1, "12-20", "Tea", 40L), + Row.of(2, "12-20", "Pot", 60L))) + .isTrue(); + assertThat( + checkOneRecord( + result.get(2), + 3, + "Liu", + "NanJing", + Row.of(1, "12-25", "Bat", 15L), + Row.of(1, "12-26", "Cup", 30L))) + .isTrue(); + + // query using UNNEST + List<Row> unnested = + sql( + "SELECT order_id, user_name, address, daily_id, today, product_name, price " + + "FROM order_wide, UNNEST(sub_orders) AS so(daily_id, today, product_name, price)"); + + assertThat(unnested) + .containsExactlyInAnyOrder( + Row.of(1, "Wang", "HangZhou", 1, "12-20", "Apple", 8000L), + Row.of(1, "Wang", "HangZhou", 2, "12-20", "Tesla", 400000L), + Row.of(1, "Wang", "HangZhou", 1, "12-21", "Sangsung", 5000L), + Row.of(2, "Zhao", "ChengDu", 1, "12-20", "Tea", 40L), + Row.of(2, "Zhao", "ChengDu", 2, "12-20", "Pot", 60L), + Row.of(3, "Liu", "NanJing", 1, "12-25", "Bat", 15L), + Row.of(3, "Liu", "NanJing", 1, "12-26", "Cup", 30L)); + } + + @Test + public void testUseCaseWithNullValue() { + sql( + "INSERT INTO order_wide\n" + + "SELECT 6, CAST (NULL AS STRING), CAST (NULL AS STRING), " + + "ARRAY[cast(null as ROW<daily_id INT, today STRING, product_name STRING, price BIGINT>)]"); + + List<Row> result = + sql("SELECT * FROM order_wide").stream() + .sorted(Comparator.comparingInt(r -> r.getFieldAs(0))) + .collect(Collectors.toList()); + + assertThat(checkOneRecord(result.get(0), 6, null, null, (Row) null)).isTrue(); + + sql( + "INSERT INTO order_wide\n" + + "SELECT 6, 'Sun', CAST (NULL AS STRING), " + + "ARRAY[ROW(1, '01-01','Apple', 6999)]"); + + result = + sql("SELECT * FROM order_wide").stream() + .sorted(Comparator.comparingInt(r -> r.getFieldAs(0))) + .collect(Collectors.toList()); + assertThat( + checkOneRecord( + result.get(0), + 6, + "Sun", + null, + Row.of(1, "01-01", "Apple", 6999L))) + .isTrue(); + } + + @Test + public void testUseCaseAppend() { + sql( + "INSERT INTO orders VALUES " + + "(1, 'Wang', 'HangZhou')," + + "(2, 'Zhao', 'ChengDu')," + + "(3, 'Liu', 'NanJing')"); + + sql( + "INSERT INTO sub_orders VALUES " + + "(1, 1, '12-20', 'Apple', 8000)," + + "(2, 1, '12-20', 'Tesla', 400000)," + + "(3, 1, '12-25', 'Bat', 15)," + + "(3, 1, '12-26', 'Cup', 30)"); + + sql(widenAppendSql()); + + // query using UNNEST + List<Row> unnested = + sql( + "SELECT order_id, user_name, address, daily_id, today, product_name, price " + + "FROM order_append_wide, UNNEST(sub_orders) AS so(daily_id, today, product_name, price)"); + + assertThat(unnested) + .containsExactlyInAnyOrder( + Row.of(1, "Wang", "HangZhou", 1, "12-20", "Apple", 8000L), + Row.of(2, "Zhao", "ChengDu", 1, "12-20", "Tesla", 400000L), + Row.of(3, "Liu", "NanJing", 1, "12-25", "Bat", 15L), + Row.of(3, "Liu", "NanJing", 1, "12-26", "Cup", 30L)); + } + @Test @Timeout(60) public void testUpdateWithIgnoreRetract() throws Exception {
