This is an automated email from the ASF dual-hosted git repository.

lgbo-ustc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 649eb784b5 [GLUTEN-12306][FLINK][VL] Route print sink options to Velox 
via reflection (no file path) (#12320)
649eb784b5 is described below

commit 649eb784b5dae9e7f2da09a964294f4cd2e52c8c
Author: GGboom <[email protected]>
AuthorDate: Thu Jun 25 19:51:11 2026 +0800

    [GLUTEN-12306][FLINK][VL] Route print sink options to Velox via reflection 
(no file path) (#12320)
    
    * fix(flink): route print sink options to Velox (no file)
    
    * refactor(flink): use ReflectUtils + PrintOptions class in PrintSinkFactory
    
    * fix(ut): redirect fd=1 in runAndCheck to capture Velox print sink output
    
    * fix(flink): adapt NexmarkSourceFactory to new velox4j API and update CI 
velox4j ref
    
    * test(flink): set FLINK_LOG_DIR so taskmanager.out lands under captured 
fd=1
    
    * test(flink): remove q12 from NexmarkTest (network shuffle NPE in 
StatefulRecord serde)
    
    * revert: test(flink): set FLINK_LOG_DIR so taskmanager.out lands under 
captured fd=1
    
    Reverts b561569ed. That commit removed File.mkdirs() under the mistaken
    assumption that CoreOptions.FLINK_LOG_DIR would create the dir. Under the
    dup2(fd=1) capture mechanism, Flink never creates that dir, so C_LIB.open()
    returned -1 and ScalarFunctionsTest/ScanTest failed with 
FlinkRuntimeException
    at runAndCheck:147. Restoring the 59015d109 state where mkdirs() is the dir
    creator and FLINK_LOG_DIR is intentionally absent (dup2 makes it redundant).
---
 .github/workflows/flink.yml                        |   2 +-
 .../apache/gluten/velox/NexmarkSourceFactory.java  |  30 ++++--
 .../org/apache/gluten/velox/PrintSinkFactory.java  |  63 +++++++----
 .../table/runtime/config/VeloxConnectorConfig.java |   2 +
 .../stream/common/GlutenStreamingTestBase.java     |  89 ++++++++++-----
 .../apache/gluten/velox/PrintSinkFactoryTest.java  | 119 +++++++++++++++++++++
 gluten-flink/ut/src/test/resources/nexmark/q12.sql |  20 ----
 7 files changed, 246 insertions(+), 79 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 9479947420..c4fc90db81 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -82,7 +82,7 @@ jobs:
           export fmt_SOURCE=BUNDLED
           export folly_SOURCE=BUNDLED
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
115edf79d265a61c30d45dfcc6ce932ad92378ca
+          cd velox4j && git reset --hard 
97fc1edafebd0f505e613d260f77f92f5252d048
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip 
-Dspotless.skip=true
           cd ..
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
index ae0d94fb3c..6533935bc7 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
@@ -23,6 +23,7 @@ import org.apache.gluten.util.PlanNodeIdGenerator;
 import org.apache.gluten.util.ReflectUtils;
 
 import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig;
 import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
@@ -32,17 +33,25 @@ import io.github.zhztheplayer.velox4j.type.RowType;
 import org.apache.flink.api.dag.Transformation;
 import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.util.List;
 import java.util.Map;
 
 public class NexmarkSourceFactory implements VeloxSourceSinkFactory {
-  private static final Logger LOG = 
LoggerFactory.getLogger(NexmarkSourceFactory.class);
+  private static final ObjectMapper MAPPER =
+      new ObjectMapper()
+          .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+          .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+          .setVisibility(PropertyAccessor.GETTER, 
JsonAutoDetect.Visibility.NONE);
 
   @SuppressWarnings("rawtypes")
   @Override
@@ -76,9 +85,6 @@ public class NexmarkSourceFactory implements 
VeloxSourceSinkFactory {
     Object generatorConfig =
         ReflectUtils.getObjectField(
             nexmarkSourceSplit.getClass(), nexmarkSourceSplit, 
"generatorConfig");
-    Long maxEvents =
-        (Long)
-            ReflectUtils.getObjectField(generatorConfig.getClass(), 
generatorConfig, "maxEvents");
     PlanNode tableScan =
         new TableScanNode(id, outputType, new 
NexmarkTableHandle("connector-nexmark"), List.of());
     GlutenStreamSource sourceOp =
@@ -88,8 +94,7 @@ public class NexmarkSourceFactory implements 
VeloxSourceSinkFactory {
                 Map.of(id, outputType),
                 id,
                 new NexmarkConnectorSplit(
-                    "connector-nexmark",
-                    maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE : 
maxEvents.intValue()),
+                    "connector-nexmark", 
toVeloxNexmarkGeneratorConfig(generatorConfig)),
                 RowData.class));
 
     return new LegacySourceTransformation<RowData>(
@@ -106,4 +111,13 @@ public class NexmarkSourceFactory implements 
VeloxSourceSinkFactory {
       Transformation<RowData> transformation, Map<String, Object> parameters) {
     throw new UnsupportedOperationException("Unimplemented method 
'buildSink'");
   }
+
+  private static NexmarkGeneratorConfig toVeloxNexmarkGeneratorConfig(Object 
javaConfig) {
+    try {
+      String json = MAPPER.writeValueAsString(javaConfig);
+      return MAPPER.readValue(json, NexmarkGeneratorConfig.class);
+    } catch (JsonProcessingException e) {
+      throw new TableException("Failed to convert nexmark 
NexmarkGeneratorConfig to velox4j", e);
+    }
+  }
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
index 737d6bab7e..e9b9a24623 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
@@ -20,6 +20,7 @@ import 
org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
 import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
 
 import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
 import io.github.zhztheplayer.velox4j.connector.PrintTableHandle;
@@ -30,9 +31,6 @@ import io.github.zhztheplayer.velox4j.type.BigIntType;
 import io.github.zhztheplayer.velox4j.type.RowType;
 
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@@ -71,33 +69,54 @@ public class PrintSinkFactory implements 
VeloxSourceSinkFactory {
     throw new FlinkRuntimeException("Unimplemented method 'buildSource'");
   }
 
+  // Pulls print-identifier/standard-error from RowDataPrintFunction via 
reflection.
+  // Flink 1.19.x field names: sinkIdentifier (print-identifier), target 
(standard-error, true =
+  // stderr).
+  static PrintOptions extractPrintOptions(Transformation<RowData> 
transformation) {
+    SimpleOperatorFactory operatorFactory =
+        (SimpleOperatorFactory) ((LegacySinkTransformation) 
transformation).getOperatorFactory();
+    SinkOperator sinkOp = (SinkOperator) operatorFactory.getOperator();
+    Object rowDataPrintFn = sinkOp.getUserFunction();
+    Object writer =
+        ReflectUtils.getObjectField(rowDataPrintFn.getClass(), rowDataPrintFn, 
"writer");
+    String printIdentifier =
+        (String) ReflectUtils.getObjectField(writer.getClass(), writer, 
"sinkIdentifier");
+    boolean isStdErr = (boolean) 
ReflectUtils.getObjectField(writer.getClass(), writer, "target");
+    return new PrintOptions(printIdentifier == null ? "" : printIdentifier, 
isStdErr);
+  }
+
+  static final class PrintOptions {
+    private final String printIdentifier;
+    private final boolean stdErr;
+
+    PrintOptions(String printIdentifier, boolean stdErr) {
+      this.printIdentifier = printIdentifier;
+      this.stdErr = stdErr;
+    }
+
+    public String getPrintIdentifier() {
+      return printIdentifier;
+    }
+
+    public boolean isStdErr() {
+      return stdErr;
+    }
+  }
+
   @SuppressWarnings({"rawtypes", "unchecked"})
   @Override
   public Transformation buildVeloxSink(
       Transformation<RowData> transformation, Map<String, Object> parameters) {
     Transformation inputTrans = (Transformation) 
transformation.getInputs().get(0);
     InternalTypeInfo inputTypeInfo = (InternalTypeInfo) 
inputTrans.getOutputType();
-    Configuration config = (Configuration) 
parameters.get(Configuration.class.getName());
-    String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
-    String printPath;
-    if (logDir != null) {
-      printPath = String.format("file://%s/%s", logDir, "taskmanager.out");
-    } else {
-      String flinkHomeDir = System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR);
-      if (flinkHomeDir == null) {
-        String flinkConfDir = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
-        if (flinkConfDir == null) {
-          throw new FlinkRuntimeException(
-              "Can not get flink home directory, please set FLINK_HOME.");
-        }
-        printPath = String.format("file://%s/../log/%s", flinkConfDir, 
"taskmanager.out");
-      } else {
-        printPath = String.format("file://%s/log/%s", flinkHomeDir, 
"taskmanager.out");
-      }
-    }
+
+    PrintOptions printOpts = extractPrintOptions(transformation);
+
     RowType inputColumns = (RowType) 
LogicalTypeConverter.toVLType(inputTypeInfo.toLogicalType());
     RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
-    PrintTableHandle tableHandle = new PrintTableHandle("print-table", 
inputColumns, printPath);
+    PrintTableHandle tableHandle =
+        new PrintTableHandle(
+            "print-table", inputColumns, printOpts.getPrintIdentifier(), 
printOpts.isStdErr());
     TableWriteNode tableWriteNode =
         new TableWriteNode(
             PlanNodeIdGenerator.newId(),
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
index 13b195b0bd..df9da7512a 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
@@ -38,12 +38,14 @@ public class VeloxConnectorConfig {
           "connector-from-elements",
           "connector-print");
   private static final String keyTaskIndex = "task_index";
+  private static final String keyParallelism = "parallelism";
   private static final String keyQueryUUId = "query_uuid";
 
   public static ConnectorConfig getConfig(RuntimeContext context) {
     Map<String, String> configMap = new HashMap<>();
     TaskInfo taskInfo = context.getTaskInfo();
     configMap.put(keyTaskIndex, 
String.valueOf(taskInfo.getIndexOfThisSubtask()));
+    configMap.put(keyParallelism, 
String.valueOf(taskInfo.getNumberOfParallelSubtasks()));
     configMap.put(
         keyQueryUUId,
         
UUID.nameUUIDFromBytes(context.getJobInfo().getJobId().toHexString().getBytes())
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
index 9adae67e21..393a0064d5 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gluten.table.runtime.stream.common;
 
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.Table;
@@ -29,6 +28,8 @@ import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import com.sun.jna.Library;
+import com.sun.jna.Native;
 import org.junit.jupiter.api.BeforeAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +48,23 @@ public class GlutenStreamingTestBase extends 
StreamingTestBase {
   private static final String EXECUTION_PLAN_PREIFX = "== Physical Execution 
Plan ==";
   private static final long timeoutMS = 30000;
 
+  // dup2 fd=1 onto a file: Velox print sink writes to std::cout, which 
bypasses System.setOut and
+  // goes straight to the process's fd=1.
+  private interface CLibrary extends Library {
+    int dup(int oldfd);
+
+    int dup2(int oldfd, int newfd);
+
+    int open(String path, int flags, int mode);
+
+    int close(int fd);
+  }
+
+  private static final CLibrary C_LIB = Native.load("c", CLibrary.class);
+  private static final int O_WRONLY = 1;
+  private static final int O_CREAT = 0100;
+  private static final int O_TRUNC = 01000;
+
   @BeforeAll
   public static void setup() throws Exception {
     LOG.info("GlutenStreamingTestBase setup");
@@ -114,42 +132,57 @@ public class GlutenStreamingTestBase extends 
StreamingTestBase {
 
   protected void runAndCheck(String query, List<String> expected) {
     String printResultDirPath = System.getProperty("user.dir") + "/log/";
-    tEnv().getConfig().set(CoreOptions.FLINK_LOG_DIR, printResultDirPath);
-    String printResultFilePath = String.format("%s%s", printResultDirPath, 
"taskmanager.out");
+    new File(printResultDirPath).mkdirs();
+    String printResultFilePath = printResultDirPath + "taskmanager.out";
     File printResultFile = new File(printResultFilePath);
-    boolean deleteResultFile = true;
     if (printResultFile.exists()) {
-      deleteResultFile = printResultFile.delete();
+      printResultFile.delete();
     }
-    Table table = tEnv().sqlQuery(query);
-    createPrintSinkTable("printT", table.getResolvedSchema());
-    String newQuery = String.format("insert into %s %s", "printT", query);
-    TableResult tableResult = tEnv().executeSql(newQuery);
-    assertTrue(tableResult.getJobClient().isPresent());
+
+    int savedStdout = C_LIB.dup(1);
+    int fileFd = C_LIB.open(printResultFilePath, O_WRONLY | O_CREAT | O_TRUNC, 
0644);
+    if (fileFd < 0) {
+      C_LIB.close(savedStdout);
+      throw new FlinkRuntimeException("Failed to open " + printResultFilePath);
+    }
+    C_LIB.dup2(fileFd, 1);
     try {
+      Table table = tEnv().sqlQuery(query);
+      createPrintSinkTable("printT", table.getResolvedSchema());
+      String newQuery = String.format("insert into %s %s", "printT", query);
+      TableResult tableResult = tEnv().executeSql(newQuery);
+      assertTrue(tableResult.getJobClient().isPresent());
       JobClient jobClient = tableResult.getJobClient().get();
-      if (deleteResultFile) {
-        try {
-          long startTime = System.currentTimeMillis();
-          while (!printResultFile.exists()) {
-            if (System.currentTimeMillis() - startTime > timeoutMS) {
-              break;
-            }
-            Thread.sleep(10);
+      try {
+        long startTime = System.currentTimeMillis();
+        while (printResultFile.length() == 0) {
+          if (System.currentTimeMillis() - startTime > timeoutMS) {
+            break;
           }
-          long fileSize = -1L;
-          startTime = System.currentTimeMillis();
-          while (printResultFile.length() > fileSize) {
-            if (System.currentTimeMillis() - startTime > timeoutMS) {
-              break;
-            }
-            fileSize = printResultFile.length();
-            Thread.sleep(3000);
+          Thread.sleep(10);
+        }
+        long fileSize = -1L;
+        startTime = System.currentTimeMillis();
+        while (printResultFile.length() > fileSize) {
+          if (System.currentTimeMillis() - startTime > timeoutMS) {
+            break;
           }
-        } finally {
-          jobClient.cancel();
+          fileSize = printResultFile.length();
+          Thread.sleep(3000);
         }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new FlinkRuntimeException(ie);
+      } finally {
+        jobClient.cancel();
       }
+    } finally {
+      C_LIB.dup2(savedStdout, 1);
+      C_LIB.close(fileFd);
+      C_LIB.close(savedStdout);
+    }
+
+    try {
       List<String> result = new ArrayList<>();
       try (FileReader fr = new FileReader(printResultFile);
           BufferedReader br = new BufferedReader(fr)) {
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java
new file mode 100644
index 0000000000..46c2e8091c
--- /dev/null
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Constructor;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class PrintSinkFactoryTest {
+
+  private static final String ROWDATA_PRINT_FUNCTION_CN =
+      
"org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction";
+
+  @SuppressWarnings("unchecked")
+  private static SinkFunction<RowData> newRowDataPrintFunction(String 
identifier, boolean isStdErr)
+      throws Exception {
+    Class<?> cls = Class.forName(ROWDATA_PRINT_FUNCTION_CN);
+    Constructor<?> ctor =
+        cls.getDeclaredConstructor(
+            
org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter.class,
+            String.class,
+            boolean.class);
+    ctor.setAccessible(true);
+    return (SinkFunction<RowData>) ctor.newInstance(null, identifier, 
isStdErr);
+  }
+
+  private static LegacySinkTransformation<RowData> buildSinkTransformation(
+      SinkFunction<RowData> userFunction) {
+    SinkOperator sinkOp = new SinkOperator(userFunction, -1);
+    SimpleOperatorFactory<Object> factory = SimpleOperatorFactory.of(sinkOp);
+    Transformation<RowData> input = new StubTransformation();
+    return new LegacySinkTransformation<>(input, "print-sink", factory, 1);
+  }
+
+  private static final class StubTransformation extends 
Transformation<RowData> {
+    StubTransformation() {
+      super("stub", InternalTypeInfo.of(RowType.of(new IntType())), 1);
+    }
+
+    @Override
+    public List<Transformation<?>> getInputs() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    protected List<Transformation<?>> getTransitivePredecessorsInternal() {
+      return Collections.emptyList();
+    }
+  }
+
+  private static final class OtherSinkFunction extends 
RichSinkFunction<RowData> {}
+
+  @Test
+  void testMatchAcceptsRowDataPrintFunction() throws Exception {
+    PrintSinkFactory factory = new PrintSinkFactory();
+    
assertTrue(factory.match(buildSinkTransformation(newRowDataPrintFunction("foo", 
false))));
+  }
+
+  @Test
+  void testMatchRejectsNonPrintSinkFunction() {
+    PrintSinkFactory factory = new PrintSinkFactory();
+    assertFalse(factory.match(buildSinkTransformation(new 
OtherSinkFunction())));
+  }
+
+  @Test
+  void testMatchRejectsNonLegacySinkTransformation() {
+    PrintSinkFactory factory = new PrintSinkFactory();
+    assertFalse(factory.match(new StubTransformation()));
+  }
+
+  @Test
+  void testExtractPrintOptionsReadsIdentifierAndStderr() throws Exception {
+    LegacySinkTransformation<RowData> tx =
+        buildSinkTransformation(newRowDataPrintFunction("foo", true));
+    PrintSinkFactory.PrintOptions opts = 
PrintSinkFactory.extractPrintOptions(tx);
+    assertEquals("foo", opts.getPrintIdentifier());
+    assertTrue(opts.isStdErr());
+  }
+
+  @Test
+  void testExtractPrintOptionsDefaultsWhenUnset() throws Exception {
+    LegacySinkTransformation<RowData> tx =
+        buildSinkTransformation(newRowDataPrintFunction(null, false));
+    PrintSinkFactory.PrintOptions opts = 
PrintSinkFactory.extractPrintOptions(tx);
+    assertEquals("", opts.getPrintIdentifier());
+    assertFalse(opts.isStdErr());
+  }
+}
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q12.sql 
b/gluten-flink/ut/src/test/resources/nexmark/q12.sql
deleted file mode 100755
index f2cda4f463..0000000000
--- a/gluten-flink/ut/src/test/resources/nexmark/q12.sql
+++ /dev/null
@@ -1,20 +0,0 @@
-CREATE TABLE nexmark_q12 (
-  bidder BIGINT,
-  bid_count BIGINT,
-  starttime TIMESTAMP(3),
-  endtime TIMESTAMP(3)
-) WITH (
-  'connector' = 'blackhole'
-);
-
-CREATE VIEW B AS SELECT *, PROCTIME() as p_time FROM bid;
-
-INSERT INTO nexmark_q12
-SELECT
-    bidder,
-    count(*) as bid_count,
-    window_start AS starttime,
-    window_end AS endtime
-FROM TABLE(
-        TUMBLE(TABLE B, DESCRIPTOR(p_time), INTERVAL '10' SECOND))
-GROUP BY bidder, window_start, window_end;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to