This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 60e4e1ebbf [GLUTEN-9892][FLINK] Add validation test for Nexmark q0
(#9893)
60e4e1ebbf is described below
commit 60e4e1ebbf61930fa61cfcbad9c6bde9ba3b99d3
Author: yuanhang ma <[email protected]>
AuthorDate: Mon Jun 23 10:13:19 2025 +0800
[GLUTEN-9892][FLINK] Add validation test for Nexmark q0 (#9893)
---
.github/workflows/flink.yml | 4 +
gluten-flink/docs/Flink.md | 14 +-
.../rexnode/functions/BaseRexCallConverters.java | 28 ++++
.../rexnode/functions/RexCallConverterFactory.java | 3 +-
.../gluten/vectorized/ArrowVectorAccessor.java | 16 ++
gluten-flink/ut/pom.xml | 6 +
.../table/runtime/stream/custom/NexmarkTest.java | 164 +++++++++++++++++++++
.../ut/src/test/resources/nexmark/ddl_gen.sql | 46 ++++++
.../ut/src/test/resources/nexmark/ddl_views.sql | 36 +++++
gluten-flink/ut/src/test/resources/nexmark/q0.sql | 13 ++
10 files changed, 327 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index f4124b2355..37a3d43f27 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -46,6 +46,10 @@ jobs:
cd velox4j && git reset --hard
c069ca63aa3a200b49199f7dfa285f011ce6fda5
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dspotless.skip=true
+ cd ..
+ git clone https://github.com/nexmark/nexmark.git
+ cd nexmark
+ mvn clean install -DskipTests
- name: Build Gluten Flink
run: |
cd $GITHUB_WORKSPACE/gluten-flink
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 6b40351cb8..c330deb67b 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -64,9 +64,21 @@ git clone https://github.com/apache/incubator-gluten.git
```
cd /path/to/gluten/gluten-flink
-mvn clean package
+mvn clean package -Dmaven.test.skip=true
```
+# Run Unit Tests
+**Get Nexmark**
+```shell
+git clone https://github.com/nexmark/nexmark.git
+cd nexmark
+mvn clean install -DskipTests
+```
+**Run Tests**
+```shell
+cd /path/to/gluten/gluten-flink
+mvn test
+```
## Dependency library deployment
You need to get the Velox4j packages and used them with gluten.
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
index c8f2d66348..8a7c41ab1d 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
@@ -22,6 +22,8 @@ import org.apache.gluten.rexnode.TypeUtils;
import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.TimestampType;
import io.github.zhztheplayer.velox4j.type.Type;
import org.apache.calcite.rex.RexCall;
@@ -84,3 +86,29 @@ class BasicArithmeticOperatorRexCallConverter extends
BaseRexCallConverter {
return new CallTypedExpr(resultType, alignedParams, functionName);
}
}
+
+class SubtractRexCallConverter extends BaseRexCallConverter {
+
+ public SubtractRexCallConverter() {
+ super("subtract");
+ }
+
+ @Override
+ public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context)
{
+ List<TypedExpr> params = getParams(callNode, context);
+
+ if (params.get(0).getReturnType() instanceof TimestampType
+ && params.get(1).getReturnType() instanceof BigIntType) {
+
+ Type bigIntType = new BigIntType();
+ TypedExpr castExpr = new CallTypedExpr(bigIntType,
List.of(params.get(0)), "cast");
+
+ List<TypedExpr> newParams = List.of(castExpr, params.get(1));
+ return new CallTypedExpr(bigIntType, newParams, functionName);
+ }
+
+ List<TypedExpr> alignedParams =
TypeUtils.promoteTypeForArithmeticExpressions(params);
+ Type resultType = getResultType(callNode);
+ return new CallTypedExpr(resultType, alignedParams, functionName);
+ }
+}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
index 010a78f044..3dc08e27db 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
@@ -56,8 +56,7 @@ public class RexCallConverterFactory {
() -> new StringNumberCompareRexCallConverter("equalto"))),
Map.entry(
"*", Arrays.asList(() -> new
BasicArithmeticOperatorRexCallConverter("multiply"))),
- Map.entry(
- "-", Arrays.asList(() -> new
BasicArithmeticOperatorRexCallConverter("subtract"))),
+ Map.entry("-", Arrays.asList(() -> new SubtractRexCallConverter())),
Map.entry("+", Arrays.asList(() -> new
BasicArithmeticOperatorRexCallConverter("add"))),
Map.entry("MOD", Arrays.asList(() -> new ModRexCallConverter())),
Map.entry("CAST", Arrays.asList(() -> new
DefaultRexCallConverter("cast"))),
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
index eecdf957da..61db66577b 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
@@ -21,6 +21,7 @@ import io.github.zhztheplayer.velox4j.type.*;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.arrow.vector.BigIntVector;
@@ -29,6 +30,7 @@ import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
@@ -59,6 +61,7 @@ public abstract class ArrowVectorAccessor {
Map.entry(StructVector.class, vector -> new
StructVectorAccessor(vector)),
Map.entry(ListVector.class, vector -> new
ListVectorAccessor(vector)),
Map.entry(DateDayVector.class, vector -> new
DateDayVectorAccessor(vector)),
+ Map.entry(TimeStampMicroVector.class, vector -> new
TimeStampMicroVectorAccessor(vector)),
Map.entry(MapVector.class, vector -> new MapVectorAccessor(vector)));
public static ArrowVectorAccessor create(FieldVector vector) {
@@ -244,3 +247,16 @@ class MapVectorAccessor extends
BaseArrowVectorAccessor<MapVector> {
return new GenericMapData(mapEntries);
}
}
+
+class TimeStampMicroVectorAccessor extends
BaseArrowVectorAccessor<TimeStampMicroVector> {
+
+ public TimeStampMicroVectorAccessor(FieldVector vector) {
+ super(vector);
+ }
+
+ @Override
+ public Object getImpl(int rowIndex) {
+ long milliseconds = typedVector.get(rowIndex) / 1000;
+ return TimestampData.fromEpochMillis(milliseconds);
+ }
+}
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index bcdd7c9c89..d74bb14c33 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -90,6 +90,12 @@
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.nexmark</groupId>
+ <artifactId>nexmark-flink</artifactId>
+ <version>0.3-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>io.github.zhztheplayer</groupId>
<artifactId>velox4j</artifactId>
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
new file mode 100644
index 0000000000..ca91b764f8
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.stream.custom;
+
+import org.apache.gluten.table.runtime.stream.common.Velox4jEnvironment;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class NexmarkTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NexmarkTest.class);
+ private static final String NEXMARK_RESOURCE_DIR = "nexmark";
+
+ private static final Map<String, String> NEXMARK_VARIABLES =
+ new HashMap<String, String>() {
+ {
+ put("TPS", "10");
+ put("EVENTS_NUM", "100");
+ put("PERSON_PROPORTION", "1");
+ put("AUCTION_PROPORTION", "3");
+ put("BID_PROPORTION", "46");
+ }
+ };
+
+ private static StreamTableEnvironment tEnv;
+
+ @BeforeAll
+ static void setup() {
+ LOG.info("NexmarkTest setup");
+ Velox4jEnvironment.initializeOnce();
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
+ tEnv = StreamTableEnvironment.create(env, settings);
+
+ setupNexmarkEnvironment(tEnv);
+ }
+
+ @Test
+ void testAllNexmarkQueries() {
+ List<String> queryFiles = getQueries();
+ assertThat(queryFiles).isNotEmpty();
+
+ LOG.info("Found {} Nexmark query files: {}", queryFiles.size(),
queryFiles);
+
+ for (String queryFile : queryFiles) {
+ LOG.info("Executing query from file: {}", queryFile);
+ executeQuery(tEnv, queryFile);
+ }
+ }
+
+ private static void setupNexmarkEnvironment(StreamTableEnvironment tEnv) {
+ String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR +
"/ddl_gen.sql");
+ createNexmarkSource = replaceVariables(createNexmarkSource,
NEXMARK_VARIABLES);
+ tEnv.executeSql(createNexmarkSource);
+
+ String createTableView = readSqlFromFile(NEXMARK_RESOURCE_DIR +
"/ddl_views.sql");
+ String[] sqlTableView = createTableView.split(";");
+ for (String sql : sqlTableView) {
+ String trimmedSql = sql.trim();
+ if (!trimmedSql.isEmpty()) {
+ tEnv.executeSql(trimmedSql);
+ }
+ }
+ }
+
+ private static String replaceVariables(String sql, Map<String, String>
variables) {
+ String result = sql;
+ for (Map.Entry<String, String> entry : variables.entrySet()) {
+ result = result.replace("${" + entry.getKey() + "}", entry.getValue());
+ }
+ return result;
+ }
+
+ private void executeQuery(StreamTableEnvironment tEnv, String queryFileName)
{
+ String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" +
queryFileName);
+
+ String[] sqlStatements = queryContent.split(";");
+ assertThat(sqlStatements.length).isGreaterThanOrEqualTo(2);
+
+ String createResultTable = sqlStatements[0].trim();
+ if (!createResultTable.isEmpty()) {
+ TableResult createResult = tEnv.executeSql(createResultTable);
+ assertThat(createResult.getJobClient().isPresent()).isFalse();
+ }
+
+ String insertQuery = sqlStatements[1].trim();
+ if (!insertQuery.isEmpty()) {
+ TableResult insertResult = tEnv.executeSql(insertQuery);
+ assertThat(insertResult.getJobClient().isPresent()).isTrue();
+ }
+ }
+
+ private List<String> getQueries() {
+ URL resourceUrl =
getClass().getClassLoader().getResource(NEXMARK_RESOURCE_DIR);
+
+ try {
+ Path resourcePath = Paths.get(resourceUrl.toURI());
+ List<String> queryFiles = new ArrayList<>();
+
+ try (DirectoryStream<Path> stream =
Files.newDirectoryStream(resourcePath, "q*.sql")) {
+ for (Path entry : stream) {
+ queryFiles.add(entry.getFileName().toString());
+ }
+ }
+
+ return queryFiles.stream().sorted().collect(Collectors.toList());
+
+ } catch (URISyntaxException | IOException e) {
+ throw new RuntimeException("Failed to discover query files", e);
+ }
+ }
+
+ private static String readSqlFromFile(String fileName) {
+ try {
+ URL resource = NexmarkTest.class.getClassLoader().getResource(fileName);
+ if (resource == null) {
+ throw new RuntimeException("SQL file not found: " + fileName);
+ }
+ return new String(Files.readAllBytes(Paths.get(resource.toURI())));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to read SQL file: " + fileName, e);
+ }
+ }
+}
diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_gen.sql
b/gluten-flink/ut/src/test/resources/nexmark/ddl_gen.sql
new file mode 100644
index 0000000000..24f47aa577
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_gen.sql
@@ -0,0 +1,46 @@
+CREATE TABLE datagen (
+ event_type int,
+ person ROW<
+ id BIGINT,
+ name VARCHAR,
+ emailAddress VARCHAR,
+ creditCard VARCHAR,
+ city VARCHAR,
+ state VARCHAR,
+ `dateTime` TIMESTAMP(3),
+ extra VARCHAR>,
+ auction ROW<
+ id BIGINT,
+ itemName VARCHAR,
+ description VARCHAR,
+ initialBid BIGINT,
+ reserve BIGINT,
+ `dateTime` TIMESTAMP(3),
+ expires TIMESTAMP(3),
+ seller BIGINT,
+ category BIGINT,
+ extra VARCHAR>,
+ bid ROW<
+ auction BIGINT,
+ bidder BIGINT,
+ price BIGINT,
+ channel VARCHAR,
+ url VARCHAR,
+ `dateTime` TIMESTAMP(3),
+ extra VARCHAR>,
+ `dateTime` AS
+ CASE
+ WHEN event_type = 0 THEN person.`dateTime`
+ WHEN event_type = 1 THEN auction.`dateTime`
+ ELSE bid.`dateTime`
+ END,
+ WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND
+) WITH (
+ 'connector' = 'nexmark',
+ 'first-event.rate' = '${TPS}',
+ 'next-event.rate' = '${TPS}',
+ 'events.num' = '${EVENTS_NUM}',
+ 'person.proportion' = '${PERSON_PROPORTION}',
+ 'auction.proportion' = '${AUCTION_PROPORTION}',
+ 'bid.proportion' = '${BID_PROPORTION}'
+);
diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
new file mode 100644
index 0000000000..54902b4b44
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
@@ -0,0 +1,36 @@
+CREATE VIEW person AS
+SELECT
+ person.id,
+ person.name,
+ person.emailAddress,
+ person.creditCard,
+ person.city,
+ person.state,
+ `dateTime`,
+ person.extra
+FROM datagen WHERE event_type = 0;
+
+CREATE VIEW auction AS
+SELECT
+ auction.id,
+ auction.itemName,
+ auction.description,
+ auction.initialBid,
+ auction.reserve,
+ `dateTime`,
+ auction.expires,
+ auction.seller,
+ auction.category,
+ auction.extra
+FROM datagen WHERE event_type = 1;
+
+CREATE VIEW bid AS
+SELECT
+ bid.auction,
+ bid.bidder,
+ bid.price,
+ bid.channel,
+ bid.url,
+ `dateTime`,
+ bid.extra
+FROM datagen WHERE event_type = 2;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q0.sql
b/gluten-flink/ut/src/test/resources/nexmark/q0.sql
new file mode 100644
index 0000000000..44e024863c
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q0.sql
@@ -0,0 +1,13 @@
+CREATE TABLE nexmark_q0 (
+ auction BIGINT,
+ bidder BIGINT,
+ price BIGINT,
+ `dateTime` TIMESTAMP(3),
+ extra VARCHAR
+) WITH (
+ 'connector' = 'blackhole'
+);
+
+
+INSERT INTO nexmark_q0
+SELECT auction, bidder, price, `dateTime`, extra FROM bid;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]