This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 31e434bed4 [GLUTEN-10632][FLINK] Add nexmark test for q11, q12 and q22
(#10735)
31e434bed4 is described below
commit 31e434bed44a3fc3e7e54f7231f5fbad4ec26232
Author: shuai.xu <[email protected]>
AuthorDate: Mon Sep 22 16:02:09 2025 +0800
[GLUTEN-10632][FLINK] Add nexmark test for q11, q12 and q22 (#10735)
* [GLUTEN-10632][FLINK] add nexmark test for q11 q12 q22
---
.../table/runtime/stream/custom/NexmarkTest.java | 19 ++++++++++++-------
gluten-flink/ut/src/test/resources/nexmark/q11.sql | 17 +++++++++++++++++
gluten-flink/ut/src/test/resources/nexmark/q12.sql | 20 ++++++++++++++++++++
gluten-flink/ut/src/test/resources/nexmark/q22.sql | 18 ++++++++++++++++++
4 files changed, 67 insertions(+), 7 deletions(-)
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index 90fdf055da..5454b29396 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -85,10 +85,10 @@ public class NexmarkTest {
List<String> queryFiles = getQueries();
assertThat(queryFiles).isNotEmpty();
- LOG.info("Found {} Nexmark query files: {}", queryFiles.size(),
queryFiles);
+ LOG.warn("Found {} Nexmark query files: {}", queryFiles.size(),
queryFiles);
for (String queryFile : queryFiles) {
- LOG.info("Executing query from file: {}", queryFile);
+ LOG.warn("Executing query from file: {}", queryFile);
executeQuery(tEnv, queryFile);
}
}
@@ -123,17 +123,22 @@ public class NexmarkTest {
String[] sqlStatements = queryContent.split(";");
assertThat(sqlStatements.length).isGreaterThanOrEqualTo(2);
- String createResultTable = sqlStatements[0].trim();
- if (!createResultTable.isEmpty()) {
- TableResult createResult = tEnv.executeSql(createResultTable);
- assertFalse(createResult.getJobClient().isPresent());
+ for (int i = 0; i < sqlStatements.length - 2; i++) {
+ // For some query tests like q12 q13 q14, the first two of the three
statements create tables
+ // or views. For others, there are only two statements, with the first
one creating a table.
+ String createResultTable = sqlStatements[i].trim();
+ if (!createResultTable.isEmpty()) {
+ TableResult createResult = tEnv.executeSql(createResultTable);
+ assertFalse(createResult.getJobClient().isPresent());
+ }
}
- String insertQuery = sqlStatements[1].trim();
+ String insertQuery = sqlStatements[sqlStatements.length - 2].trim();
if (!insertQuery.isEmpty()) {
TableResult insertResult = tEnv.executeSql(insertQuery);
waitForJobCompletion(insertResult, 30000);
}
+ assertTrue(sqlStatements[sqlStatements.length - 1].trim().isEmpty());
}
private void waitForJobCompletion(TableResult result, long timeoutMs)
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q11.sql
b/gluten-flink/ut/src/test/resources/nexmark/q11.sql
new file mode 100755
index 0000000000..e513c987df
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q11.sql
@@ -0,0 +1,17 @@
+CREATE TABLE nexmark_q11 (
+ bidder BIGINT,
+ bid_count BIGINT,
+ starttime TIMESTAMP(3),
+ endtime TIMESTAMP(3)
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q11
+SELECT
+ B.bidder,
+ count(*) as bid_count,
+ SESSION_START(B.`dateTime`, INTERVAL '10' SECOND) as starttime,
+ SESSION_END(B.`dateTime`, INTERVAL '10' SECOND) as endtime
+FROM bid B
+GROUP BY B.bidder, SESSION(B.`dateTime`, INTERVAL '10' SECOND);
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q12.sql
b/gluten-flink/ut/src/test/resources/nexmark/q12.sql
new file mode 100755
index 0000000000..f2cda4f463
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q12.sql
@@ -0,0 +1,20 @@
+CREATE TABLE nexmark_q12 (
+ bidder BIGINT,
+ bid_count BIGINT,
+ starttime TIMESTAMP(3),
+ endtime TIMESTAMP(3)
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+CREATE VIEW B AS SELECT *, PROCTIME() as p_time FROM bid;
+
+INSERT INTO nexmark_q12
+SELECT
+ bidder,
+ count(*) as bid_count,
+ window_start AS starttime,
+ window_end AS endtime
+FROM TABLE(
+ TUMBLE(TABLE B, DESCRIPTOR(p_time), INTERVAL '10' SECOND))
+GROUP BY bidder, window_start, window_end;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q22.sql
b/gluten-flink/ut/src/test/resources/nexmark/q22.sql
new file mode 100755
index 0000000000..e7a66dfc38
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q22.sql
@@ -0,0 +1,18 @@
+CREATE TABLE nexmark_q22 (
+ auction BIGINT,
+ bidder BIGINT,
+ price BIGINT,
+ channel VARCHAR,
+ dir1 VARCHAR,
+ dir2 VARCHAR,
+ dir3 VARCHAR
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+INSERT INTO nexmark_q22
+SELECT
+ auction, bidder, price, channel,
+ SPLIT_INDEX(url, '/', 3) as dir1,
+ SPLIT_INDEX(url, '/', 4) as dir2,
+ SPLIT_INDEX(url, '/', 5) as dir3 FROM bid;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]