This is an automated email from the ASF dual-hosted git repository.

zhanglistar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 27571d01ef [GLUTEN-12298][FLINK] Support nexmark source 
multi-parallelism via ParallelSplit (#12304)
27571d01ef is described below

commit 27571d01efe8448df9c3ea48aea8db061e6ed444
Author: GGboom <[email protected]>
AuthorDate: Fri Jun 26 09:02:49 2026 +0800

    [GLUTEN-12298][FLINK] Support nexmark source multi-parallelism via 
ParallelSplit (#12304)
    
    * fix(nexmark): support multi-parallelism via ParallelSplit
    
    * rename GeneratorConfig to NexmarkGeneratorConfig
    
    * use NexmarkParallelSplit for nexmark subtask dispatch
    
    * build: point flink CI at nexmark velox4j branch
    
    * test(nexmark): add NexmarkSourceFactoryTest for ParallelSplit wiring
    
    * update velox4j reference for ci
---
 .../apache/gluten/velox/NexmarkSourceFactory.java  |  24 ++-
 .../runtime/operators/GlutenSourceFunction.java    |  11 +-
 .../gluten/velox/NexmarkSourceFactoryTest.java     | 170 +++++++++++++++++++++
 3 files changed, 197 insertions(+), 8 deletions(-)

diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
index 6533935bc7..0767cd3829 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
@@ -24,6 +24,7 @@ import org.apache.gluten.util.ReflectUtils;
 
 import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig;
+import io.github.zhztheplayer.velox4j.connector.NexmarkParallelSplit;
 import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
@@ -43,6 +44,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -81,22 +83,29 @@ public class NexmarkSourceFactory implements 
VeloxSourceSinkFactory {
                 "getSplits",
                 new Class<?>[] {int.class},
                 new Object[] {transformation.getParallelism()});
-    Object nexmarkSourceSplit = nexmarkSourceSplits.get(0);
-    Object generatorConfig =
-        ReflectUtils.getObjectField(
-            nexmarkSourceSplit.getClass(), nexmarkSourceSplit, 
"generatorConfig");
+
+    // Convert each subtask's NexmarkGeneratorConfig to velox4j
+    List<NexmarkConnectorSplit> subtaskSplits = new ArrayList<>();
+    for (Object nexmarkSourceSplit : nexmarkSourceSplits) {
+      Object generatorConfig =
+          ReflectUtils.getObjectField(
+              nexmarkSourceSplit.getClass(), nexmarkSourceSplit, 
"generatorConfig");
+      subtaskSplits.add(
+          new NexmarkConnectorSplit(
+              "connector-nexmark", 
toVeloxNexmarkGeneratorConfig(generatorConfig)));
+    }
+
     PlanNode tableScan =
         new TableScanNode(id, outputType, new 
NexmarkTableHandle("connector-nexmark"), List.of());
+    NexmarkParallelSplit split = new NexmarkParallelSplit("connector-nexmark", 
subtaskSplits);
     GlutenStreamSource sourceOp =
         new GlutenStreamSource(
             new GlutenSourceFunction(
                 new StatefulPlanNode(tableScan.getId(), tableScan),
                 Map.of(id, outputType),
                 id,
-                new NexmarkConnectorSplit(
-                    "connector-nexmark", 
toVeloxNexmarkGeneratorConfig(generatorConfig)),
+                split,
                 RowData.class));
-
     return new LegacySourceTransformation<RowData>(
         transformation.getName(),
         sourceOp,
@@ -112,6 +121,7 @@ public class NexmarkSourceFactory implements 
VeloxSourceSinkFactory {
     throw new UnsupportedOperationException("Unimplemented method 
'buildSink'");
   }
 
+  /** Convert Flink nexmark NexmarkGeneratorConfig to velox4j 
NexmarkGeneratorConfig via Jackson. */
   private static NexmarkGeneratorConfig toVeloxNexmarkGeneratorConfig(Object 
javaConfig) {
     try {
       String json = MAPPER.writeValueAsString(javaConfig);
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 53f36fcf67..76cced93f1 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -22,6 +22,7 @@ import 
org.apache.gluten.table.runtime.metrics.SourceTaskMetrics;
 import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
 
 import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.ParallelSplit;
 import io.github.zhztheplayer.velox4j.iterator.UpIterator;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 import io.github.zhztheplayer.velox4j.query.Query;
@@ -230,6 +231,14 @@ public class GlutenSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
     if (sessionResource != null) {
       return;
     }
+
+    ConnectorSplit activeSplit = split;
+    int totalParallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+    if (split instanceof ParallelSplit) {
+      activeSplit = ((ParallelSplit) split).getSubtaskSplit(subtaskIndex, 
totalParallelism);
+    }
+
     sessionResource = new GlutenSessionResource();
     GlutenSessionResources.getInstance().addSessionResource(id, 
sessionResource);
     Session session = sessionResource.getSession();
@@ -239,7 +248,7 @@ public class GlutenSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
             VeloxQueryConfig.getConfig(getRuntimeContext()),
             VeloxConnectorConfig.getConfig(getRuntimeContext()));
     task = session.queryOps().execute(query);
-    task.addSplit(id, split);
+    task.addSplit(id, activeSplit);
     task.noMoreSplits(id);
     taskMetrics = new SourceTaskMetrics(getRuntimeContext().getMetricGroup());
   }
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
new file mode 100644
index 0000000000..8f149cfb73
--- /dev/null
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/NexmarkSourceFactoryTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+
+import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig;
+import io.github.zhztheplayer.velox4j.connector.NexmarkParallelSplit;
+
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Constructor;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class NexmarkSourceFactoryTest {
+
+  private static final String NEXMARK_SOURCE_CN = 
"com.github.nexmark.flink.source.NexmarkSource";
+  private static final String NEXMARK_CONFIG_CN = 
"com.github.nexmark.flink.NexmarkConfiguration";
+  private static final String GENERATOR_CONFIG_CN =
+      "com.github.nexmark.flink.generator.GeneratorConfig";
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  void testBuildVeloxSourceWrapsSplitsInNexmarkParallelSplit() throws 
Exception {
+    SourceTransformation tx = newSourceTransformation(/* parallelism= */ 2);
+
+    NexmarkSourceFactory factory = new NexmarkSourceFactory();
+    Transformation<RowData> result = factory.buildVeloxSource(tx, 
Collections.emptyMap());
+
+    LegacySourceTransformation<RowData> legacy =
+        assertInstanceOf(LegacySourceTransformation.class, result);
+    GlutenStreamSource streamSource =
+        assertInstanceOf(GlutenStreamSource.class, legacy.getOperator());
+
+    ConnectorSplit split = streamSource.getConnectorSplit();
+    NexmarkParallelSplit parallel = 
assertInstanceOf(NexmarkParallelSplit.class, split);
+
+    NexmarkConnectorSplit s0 =
+        assertInstanceOf(NexmarkConnectorSplit.class, 
parallel.getSubtaskSplit(0, 2));
+    NexmarkConnectorSplit s1 =
+        assertInstanceOf(NexmarkConnectorSplit.class, 
parallel.getSubtaskSplit(1, 2));
+
+    NexmarkGeneratorConfig c0 = s0.getConfig();
+    NexmarkGeneratorConfig c1 = s1.getConfig();
+    assertEquals(0L, c0.getFirstEventId());
+    assertEquals(500L, c0.getMaxEventsOrZero());
+    assertEquals(500L, c1.getFirstEventId());
+    assertEquals(500L, c1.getMaxEventsOrZero());
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  void testBuildVeloxSourceAtParallelismOneStillProducesParallelSplit() throws 
Exception {
+    SourceTransformation tx = newSourceTransformation(/* parallelism= */ 1);
+
+    NexmarkSourceFactory factory = new NexmarkSourceFactory();
+    Transformation<RowData> result = factory.buildVeloxSource(tx, 
Collections.emptyMap());
+
+    LegacySourceTransformation<RowData> legacy =
+        assertInstanceOf(LegacySourceTransformation.class, result);
+    GlutenStreamSource streamSource =
+        assertInstanceOf(GlutenStreamSource.class, legacy.getOperator());
+
+    NexmarkParallelSplit parallel =
+        assertInstanceOf(NexmarkParallelSplit.class, 
streamSource.getConnectorSplit());
+    NexmarkConnectorSplit s0 =
+        assertInstanceOf(NexmarkConnectorSplit.class, 
parallel.getSubtaskSplit(0, 1));
+
+    assertEquals(0L, s0.getConfig().getFirstEventId());
+    assertEquals(1000L, s0.getConfig().getMaxEventsOrZero());
+  }
+
+  @Test
+  void testBuildVeloxSourceRejectsNonSourceTransformation() {
+    NexmarkSourceFactory factory = new NexmarkSourceFactory();
+    assertThrows(
+        ClassCastException.class,
+        () -> factory.buildVeloxSource(new StubTransformation(), 
Collections.emptyMap()));
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static SourceTransformation newSourceTransformation(int parallelism) 
throws Exception {
+    Object nexmarkSource = newNexmarkSource(1000L);
+    Constructor<?> ctor =
+        SourceTransformation.class.getDeclaredConstructor(
+            String.class,
+            org.apache.flink.api.connector.source.Source.class,
+            org.apache.flink.api.common.eventtime.WatermarkStrategy.class,
+            org.apache.flink.api.common.typeinfo.TypeInformation.class,
+            int.class);
+    return (SourceTransformation)
+        ctor.newInstance(
+            "nexmark-source",
+            nexmarkSource,
+            
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
+            InternalTypeInfo.of(RowType.of(new IntType())),
+            parallelism);
+  }
+
+  private static Object newNexmarkSource(long maxEvents) throws Exception {
+    Object nexmarkConfig = 
Class.forName(NEXMARK_CONFIG_CN).getDeclaredConstructor().newInstance();
+    java.lang.reflect.Field numEvents = 
nexmarkConfig.getClass().getDeclaredField("numEvents");
+    numEvents.setAccessible(true);
+    numEvents.setLong(nexmarkConfig, maxEvents);
+
+    Class<?> generatorConfigCls = Class.forName(GENERATOR_CONFIG_CN);
+    Constructor<?> generatorConfigCtor =
+        generatorConfigCls.getDeclaredConstructor(
+            Class.forName(NEXMARK_CONFIG_CN),
+            long.class,
+            long.class,
+            long.class,
+            long.class,
+            long.class);
+    Object generatorConfig =
+        generatorConfigCtor.newInstance(nexmarkConfig, 0L, 0L, maxEvents, 
maxEvents, 0L);
+
+    Class<?> nexmarkSourceCls = Class.forName(NEXMARK_SOURCE_CN);
+    Constructor<?> nexmarkSourceCtor =
+        nexmarkSourceCls.getDeclaredConstructor(
+            generatorConfigCls, 
org.apache.flink.api.common.typeinfo.TypeInformation.class);
+    nexmarkSourceCtor.setAccessible(true);
+    return nexmarkSourceCtor.newInstance(
+        generatorConfig, InternalTypeInfo.of(RowType.of(new IntType())));
+  }
+
+  private static final class StubTransformation extends 
Transformation<RowData> {
+    StubTransformation() {
+      super("stub", InternalTypeInfo.of(RowType.of(new IntType())), 1);
+    }
+
+    @Override
+    public java.util.List<Transformation<?>> getInputs() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    protected java.util.List<Transformation<?>> 
getTransitivePredecessorsInternal() {
+      return Collections.emptyList();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to