This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 60e4e1ebbf [GLUTEN-9892][FLINK] Add validation test for Nexmark q0 
(#9893)
60e4e1ebbf is described below

commit 60e4e1ebbf61930fa61cfcbad9c6bde9ba3b99d3
Author: yuanhang ma <[email protected]>
AuthorDate: Mon Jun 23 10:13:19 2025 +0800

    [GLUTEN-9892][FLINK] Add validation test for Nexmark q0 (#9893)
---
 .github/workflows/flink.yml                        |   4 +
 gluten-flink/docs/Flink.md                         |  14 +-
 .../rexnode/functions/BaseRexCallConverters.java   |  28 ++++
 .../rexnode/functions/RexCallConverterFactory.java |   3 +-
 .../gluten/vectorized/ArrowVectorAccessor.java     |  16 ++
 gluten-flink/ut/pom.xml                            |   6 +
 .../table/runtime/stream/custom/NexmarkTest.java   | 164 +++++++++++++++++++++
 .../ut/src/test/resources/nexmark/ddl_gen.sql      |  46 ++++++
 .../ut/src/test/resources/nexmark/ddl_views.sql    |  36 +++++
 gluten-flink/ut/src/test/resources/nexmark/q0.sql  |  13 ++
 10 files changed, 327 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index f4124b2355..37a3d43f27 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -46,6 +46,10 @@ jobs:
           cd velox4j && git reset --hard 
c069ca63aa3a200b49199f7dfa285f011ce6fda5
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dspotless.skip=true
+          cd ..
+          git clone https://github.com/nexmark/nexmark.git
+          cd nexmark
+          mvn clean install -DskipTests
       - name: Build Gluten Flink
         run: |
           cd $GITHUB_WORKSPACE/gluten-flink
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 6b40351cb8..c330deb67b 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -64,9 +64,21 @@ git clone https://github.com/apache/incubator-gluten.git
 
 ```
 cd /path/to/gluten/gluten-flink
-mvn clean package 
+mvn clean package -Dmaven.test.skip=true
 ```
 
+# Run Unit Tests
+**Get Nexmark**
+```shell
+git clone https://github.com/nexmark/nexmark.git
+cd nexmark
+mvn clean install -DskipTests
+```
+**Run Tests**
+```shell
+cd /path/to/gluten/gluten-flink
+mvn test
+```
 ## Dependency library deployment
 
 You need to get the Velox4j packages and used them with gluten.
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
index c8f2d66348..8a7c41ab1d 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
@@ -22,6 +22,8 @@ import org.apache.gluten.rexnode.TypeUtils;
 
 import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
 import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.TimestampType;
 import io.github.zhztheplayer.velox4j.type.Type;
 
 import org.apache.calcite.rex.RexCall;
@@ -84,3 +86,29 @@ class BasicArithmeticOperatorRexCallConverter extends 
BaseRexCallConverter {
     return new CallTypedExpr(resultType, alignedParams, functionName);
   }
 }
+
+class SubtractRexCallConverter extends BaseRexCallConverter {
+
+  public SubtractRexCallConverter() {
+    super("subtract");
+  }
+
+  @Override
+  public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context) 
{
+    List<TypedExpr> params = getParams(callNode, context);
+
+    if (params.get(0).getReturnType() instanceof TimestampType
+        && params.get(1).getReturnType() instanceof BigIntType) {
+
+      Type bigIntType = new BigIntType();
+      TypedExpr castExpr = new CallTypedExpr(bigIntType, 
List.of(params.get(0)), "cast");
+
+      List<TypedExpr> newParams = List.of(castExpr, params.get(1));
+      return new CallTypedExpr(bigIntType, newParams, functionName);
+    }
+
+    List<TypedExpr> alignedParams = 
TypeUtils.promoteTypeForArithmeticExpressions(params);
+    Type resultType = getResultType(callNode);
+    return new CallTypedExpr(resultType, alignedParams, functionName);
+  }
+}
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
index 010a78f044..3dc08e27db 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
@@ -56,8 +56,7 @@ public class RexCallConverterFactory {
                   () -> new StringNumberCompareRexCallConverter("equalto"))),
           Map.entry(
               "*", Arrays.asList(() -> new 
BasicArithmeticOperatorRexCallConverter("multiply"))),
-          Map.entry(
-              "-", Arrays.asList(() -> new 
BasicArithmeticOperatorRexCallConverter("subtract"))),
+          Map.entry("-", Arrays.asList(() -> new SubtractRexCallConverter())),
           Map.entry("+", Arrays.asList(() -> new 
BasicArithmeticOperatorRexCallConverter("add"))),
           Map.entry("MOD", Arrays.asList(() -> new ModRexCallConverter())),
           Map.entry("CAST", Arrays.asList(() -> new 
DefaultRexCallConverter("cast"))),
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
index eecdf957da..61db66577b 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
@@ -21,6 +21,7 @@ import io.github.zhztheplayer.velox4j.type.*;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 
 import org.apache.arrow.vector.BigIntVector;
@@ -29,6 +30,7 @@ import org.apache.arrow.vector.DateDayVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.MapVector;
@@ -59,6 +61,7 @@ public abstract class ArrowVectorAccessor {
           Map.entry(StructVector.class, vector -> new 
StructVectorAccessor(vector)),
           Map.entry(ListVector.class, vector -> new 
ListVectorAccessor(vector)),
           Map.entry(DateDayVector.class, vector -> new 
DateDayVectorAccessor(vector)),
+          Map.entry(TimeStampMicroVector.class, vector -> new 
TimeStampMicroVectorAccessor(vector)),
           Map.entry(MapVector.class, vector -> new MapVectorAccessor(vector)));
 
   public static ArrowVectorAccessor create(FieldVector vector) {
@@ -244,3 +247,16 @@ class MapVectorAccessor extends 
BaseArrowVectorAccessor<MapVector> {
     return new GenericMapData(mapEntries);
   }
 }
+
+class TimeStampMicroVectorAccessor extends 
BaseArrowVectorAccessor<TimeStampMicroVector> {
+
+  public TimeStampMicroVectorAccessor(FieldVector vector) {
+    super(vector);
+  }
+
+  @Override
+  public Object getImpl(int rowIndex) {
+    long milliseconds = typedVector.get(rowIndex) / 1000;
+    return TimestampData.fromEpochMillis(milliseconds);
+  }
+}
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index bcdd7c9c89..d74bb14c33 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -90,6 +90,12 @@
             <artifactId>flink-clients</artifactId>
             <version>${flink.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.github.nexmark</groupId>
+            <artifactId>nexmark-flink</artifactId>
+            <version>0.3-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>io.github.zhztheplayer</groupId>
             <artifactId>velox4j</artifactId>
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
new file mode 100644
index 0000000000..ca91b764f8
--- /dev/null
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.stream.custom;
+
+import org.apache.gluten.table.runtime.stream.common.Velox4jEnvironment;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class NexmarkTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NexmarkTest.class);
+  private static final String NEXMARK_RESOURCE_DIR = "nexmark";
+
+  private static final Map<String, String> NEXMARK_VARIABLES =
+      new HashMap<String, String>() {
+        {
+          put("TPS", "10");
+          put("EVENTS_NUM", "100");
+          put("PERSON_PROPORTION", "1");
+          put("AUCTION_PROPORTION", "3");
+          put("BID_PROPORTION", "46");
+        }
+      };
+
+  private static StreamTableEnvironment tEnv;
+
+  @BeforeAll
+  static void setup() {
+    LOG.info("NexmarkTest setup");
+    Velox4jEnvironment.initializeOnce();
+
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setParallelism(1);
+
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+    tEnv = StreamTableEnvironment.create(env, settings);
+
+    setupNexmarkEnvironment(tEnv);
+  }
+
+  @Test
+  void testAllNexmarkQueries() {
+    List<String> queryFiles = getQueries();
+    assertThat(queryFiles).isNotEmpty();
+
+    LOG.info("Found {} Nexmark query files: {}", queryFiles.size(), 
queryFiles);
+
+    for (String queryFile : queryFiles) {
+      LOG.info("Executing query from file: {}", queryFile);
+      executeQuery(tEnv, queryFile);
+    }
+  }
+
+  private static void setupNexmarkEnvironment(StreamTableEnvironment tEnv) {
+    String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + 
"/ddl_gen.sql");
+    createNexmarkSource = replaceVariables(createNexmarkSource, 
NEXMARK_VARIABLES);
+    tEnv.executeSql(createNexmarkSource);
+
+    String createTableView = readSqlFromFile(NEXMARK_RESOURCE_DIR + 
"/ddl_views.sql");
+    String[] sqlTableView = createTableView.split(";");
+    for (String sql : sqlTableView) {
+      String trimmedSql = sql.trim();
+      if (!trimmedSql.isEmpty()) {
+        tEnv.executeSql(trimmedSql);
+      }
+    }
+  }
+
+  private static String replaceVariables(String sql, Map<String, String> 
variables) {
+    String result = sql;
+    for (Map.Entry<String, String> entry : variables.entrySet()) {
+      result = result.replace("${" + entry.getKey() + "}", entry.getValue());
+    }
+    return result;
+  }
+
+  private void executeQuery(StreamTableEnvironment tEnv, String queryFileName) 
{
+    String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" + 
queryFileName);
+
+    String[] sqlStatements = queryContent.split(";");
+    assertThat(sqlStatements.length).isGreaterThanOrEqualTo(2);
+
+    String createResultTable = sqlStatements[0].trim();
+    if (!createResultTable.isEmpty()) {
+      TableResult createResult = tEnv.executeSql(createResultTable);
+      assertThat(createResult.getJobClient().isPresent()).isFalse();
+    }
+
+    String insertQuery = sqlStatements[1].trim();
+    if (!insertQuery.isEmpty()) {
+      TableResult insertResult = tEnv.executeSql(insertQuery);
+      assertThat(insertResult.getJobClient().isPresent()).isTrue();
+    }
+  }
+
+  private List<String> getQueries() {
+    URL resourceUrl = 
getClass().getClassLoader().getResource(NEXMARK_RESOURCE_DIR);
+
+    try {
+      Path resourcePath = Paths.get(resourceUrl.toURI());
+      List<String> queryFiles = new ArrayList<>();
+
+      try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(resourcePath, "q*.sql")) {
+        for (Path entry : stream) {
+          queryFiles.add(entry.getFileName().toString());
+        }
+      }
+
+      return queryFiles.stream().sorted().collect(Collectors.toList());
+
+    } catch (URISyntaxException | IOException e) {
+      throw new RuntimeException("Failed to discover query files", e);
+    }
+  }
+
+  private static String readSqlFromFile(String fileName) {
+    try {
+      URL resource = NexmarkTest.class.getClassLoader().getResource(fileName);
+      if (resource == null) {
+        throw new RuntimeException("SQL file not found: " + fileName);
+      }
+      return new String(Files.readAllBytes(Paths.get(resource.toURI())));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to read SQL file: " + fileName, e);
+    }
+  }
+}
diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_gen.sql 
b/gluten-flink/ut/src/test/resources/nexmark/ddl_gen.sql
new file mode 100644
index 0000000000..24f47aa577
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_gen.sql
@@ -0,0 +1,46 @@
+CREATE TABLE datagen (
+    event_type int,
+    person ROW<
+        id  BIGINT,
+        name  VARCHAR,
+        emailAddress  VARCHAR,
+        creditCard  VARCHAR,
+        city  VARCHAR,
+        state  VARCHAR,
+        `dateTime` TIMESTAMP(3),
+        extra  VARCHAR>,
+    auction ROW<
+        id  BIGINT,
+        itemName  VARCHAR,
+        description  VARCHAR,
+        initialBid  BIGINT,
+        reserve  BIGINT,
+        `dateTime`  TIMESTAMP(3),
+        expires  TIMESTAMP(3),
+        seller  BIGINT,
+        category  BIGINT,
+        extra  VARCHAR>,
+    bid ROW<
+        auction  BIGINT,
+        bidder  BIGINT,
+        price  BIGINT,
+        channel  VARCHAR,
+        url  VARCHAR,
+        `dateTime`  TIMESTAMP(3),
+        extra  VARCHAR>,
+    `dateTime` AS
+        CASE
+            WHEN event_type = 0 THEN person.`dateTime`
+            WHEN event_type = 1 THEN auction.`dateTime`
+            ELSE bid.`dateTime`
+        END,
+    WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND
+) WITH (
+    'connector' = 'nexmark',
+    'first-event.rate' = '${TPS}',
+    'next-event.rate' = '${TPS}',
+    'events.num' = '${EVENTS_NUM}',
+    'person.proportion' = '${PERSON_PROPORTION}',
+    'auction.proportion' = '${AUCTION_PROPORTION}',
+    'bid.proportion' = '${BID_PROPORTION}'
+);
diff --git a/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql 
b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
new file mode 100644
index 0000000000..54902b4b44
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/ddl_views.sql
@@ -0,0 +1,36 @@
+CREATE VIEW person AS
+SELECT
+    person.id,
+    person.name,
+    person.emailAddress,
+    person.creditCard,
+    person.city,
+    person.state,
+    `dateTime`,
+    person.extra
+FROM datagen WHERE event_type = 0;
+
+CREATE VIEW auction AS
+SELECT
+    auction.id,
+    auction.itemName,
+    auction.description,
+    auction.initialBid,
+    auction.reserve,
+    `dateTime`,
+    auction.expires,
+    auction.seller,
+    auction.category,
+    auction.extra
+FROM datagen WHERE event_type = 1;
+
+CREATE VIEW bid AS
+SELECT
+    bid.auction,
+    bid.bidder,
+    bid.price,
+    bid.channel,
+    bid.url,
+    `dateTime`,
+    bid.extra
+FROM datagen WHERE event_type = 2;
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q0.sql 
b/gluten-flink/ut/src/test/resources/nexmark/q0.sql
new file mode 100644
index 0000000000..44e024863c
--- /dev/null
+++ b/gluten-flink/ut/src/test/resources/nexmark/q0.sql
@@ -0,0 +1,13 @@
+CREATE TABLE nexmark_q0 (
+  auction  BIGINT,
+  bidder  BIGINT,
+  price  BIGINT,
+  `dateTime`  TIMESTAMP(3),
+  extra  VARCHAR
+) WITH (
+  'connector' = 'blackhole'
+);
+
+
+INSERT INTO nexmark_q0
+SELECT auction, bidder, price, `dateTime`, extra FROM bid;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to