This is an automated email from the ASF dual-hosted git repository.
lgbo-ustc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 649eb784b5 [GLUTEN-12306][FLINK][VL] Route print sink options to Velox
via reflection (no file path) (#12320)
649eb784b5 is described below
commit 649eb784b5dae9e7f2da09a964294f4cd2e52c8c
Author: GGboom <[email protected]>
AuthorDate: Thu Jun 25 19:51:11 2026 +0800
[GLUTEN-12306][FLINK][VL] Route print sink options to Velox via reflection
(no file path) (#12320)
* fix(flink): route print sink options to Velox (no file)
* refactor(flink): use ReflectUtils + PrintOptions class in PrintSinkFactory
* fix(ut): redirect fd=1 in runAndCheck to capture Velox print sink output
* fix(flink): adapt NexmarkSourceFactory to new velox4j API and update CI
velox4j ref
* test(flink): set FLINK_LOG_DIR so taskmanager.out lands under captured
fd=1
* test(flink): remove q12 from NexmarkTest (network shuffle NPE in
StatefulRecord serde)
* revert: test(flink): set FLINK_LOG_DIR so taskmanager.out lands under
captured fd=1
Reverts b561569ed. That commit removed File.mkdirs() under the mistaken
assumption that CoreOptions.FLINK_LOG_DIR would create the dir. Under the
dup2(fd=1) capture mechanism, Flink never creates that dir, so C_LIB.open()
returned -1 and ScalarFunctionsTest/ScanTest failed with
FlinkRuntimeException
at runAndCheck:147. Restoring the 59015d109 state where mkdirs() is the dir
creator and FLINK_LOG_DIR is intentionally absent (dup2 makes it redundant).
---
.github/workflows/flink.yml | 2 +-
.../apache/gluten/velox/NexmarkSourceFactory.java | 30 ++++--
.../org/apache/gluten/velox/PrintSinkFactory.java | 63 +++++++----
.../table/runtime/config/VeloxConnectorConfig.java | 2 +
.../stream/common/GlutenStreamingTestBase.java | 89 ++++++++++-----
.../apache/gluten/velox/PrintSinkFactoryTest.java | 119 +++++++++++++++++++++
gluten-flink/ut/src/test/resources/nexmark/q12.sql | 20 ----
7 files changed, 246 insertions(+), 79 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 9479947420..c4fc90db81 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -82,7 +82,7 @@ jobs:
export fmt_SOURCE=BUNDLED
export folly_SOURCE=BUNDLED
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
115edf79d265a61c30d45dfcc6ce932ad92378ca
+ cd velox4j && git reset --hard
97fc1edafebd0f505e613d260f77f92f5252d048
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip
-Dspotless.skip=true
cd ..
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
index ae0d94fb3c..6533935bc7 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/NexmarkSourceFactory.java
@@ -23,6 +23,7 @@ import org.apache.gluten.util.PlanNodeIdGenerator;
import org.apache.gluten.util.ReflectUtils;
import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig;
import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
@@ -32,17 +33,25 @@ import io.github.zhztheplayer.velox4j.type.RowType;
import org.apache.flink.api.dag.Transformation;
import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Map;
public class NexmarkSourceFactory implements VeloxSourceSinkFactory {
- private static final Logger LOG =
LoggerFactory.getLogger(NexmarkSourceFactory.class);
+ private static final ObjectMapper MAPPER =
+ new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+ .setVisibility(PropertyAccessor.GETTER,
JsonAutoDetect.Visibility.NONE);
@SuppressWarnings("rawtypes")
@Override
@@ -76,9 +85,6 @@ public class NexmarkSourceFactory implements
VeloxSourceSinkFactory {
Object generatorConfig =
ReflectUtils.getObjectField(
nexmarkSourceSplit.getClass(), nexmarkSourceSplit,
"generatorConfig");
- Long maxEvents =
- (Long)
- ReflectUtils.getObjectField(generatorConfig.getClass(),
generatorConfig, "maxEvents");
PlanNode tableScan =
new TableScanNode(id, outputType, new
NexmarkTableHandle("connector-nexmark"), List.of());
GlutenStreamSource sourceOp =
@@ -88,8 +94,7 @@ public class NexmarkSourceFactory implements
VeloxSourceSinkFactory {
Map.of(id, outputType),
id,
new NexmarkConnectorSplit(
- "connector-nexmark",
- maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE :
maxEvents.intValue()),
+ "connector-nexmark",
toVeloxNexmarkGeneratorConfig(generatorConfig)),
RowData.class));
return new LegacySourceTransformation<RowData>(
@@ -106,4 +111,13 @@ public class NexmarkSourceFactory implements
VeloxSourceSinkFactory {
Transformation<RowData> transformation, Map<String, Object> parameters) {
throw new UnsupportedOperationException("Unimplemented method
'buildSink'");
}
+
+ private static NexmarkGeneratorConfig toVeloxNexmarkGeneratorConfig(Object
javaConfig) {
+ try {
+ String json = MAPPER.writeValueAsString(javaConfig);
+ return MAPPER.readValue(json, NexmarkGeneratorConfig.class);
+ } catch (JsonProcessingException e) {
+ throw new TableException("Failed to convert nexmark
NexmarkGeneratorConfig to velox4j", e);
+ }
+ }
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
index 737d6bab7e..e9b9a24623 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/PrintSinkFactory.java
@@ -20,6 +20,7 @@ import
org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
import io.github.zhztheplayer.velox4j.connector.PrintTableHandle;
@@ -30,9 +31,6 @@ import io.github.zhztheplayer.velox4j.type.BigIntType;
import io.github.zhztheplayer.velox4j.type.RowType;
import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@@ -71,33 +69,54 @@ public class PrintSinkFactory implements
VeloxSourceSinkFactory {
throw new FlinkRuntimeException("Unimplemented method 'buildSource'");
}
+ // Pulls print-identifier/standard-error from RowDataPrintFunction via
reflection.
+ // Flink 1.19.x field names: sinkIdentifier (print-identifier), target
(standard-error, true =
+ // stderr).
+ static PrintOptions extractPrintOptions(Transformation<RowData>
transformation) {
+ SimpleOperatorFactory operatorFactory =
+ (SimpleOperatorFactory) ((LegacySinkTransformation)
transformation).getOperatorFactory();
+ SinkOperator sinkOp = (SinkOperator) operatorFactory.getOperator();
+ Object rowDataPrintFn = sinkOp.getUserFunction();
+ Object writer =
+ ReflectUtils.getObjectField(rowDataPrintFn.getClass(), rowDataPrintFn,
"writer");
+ String printIdentifier =
+ (String) ReflectUtils.getObjectField(writer.getClass(), writer,
"sinkIdentifier");
+ boolean isStdErr = (boolean)
ReflectUtils.getObjectField(writer.getClass(), writer, "target");
+ return new PrintOptions(printIdentifier == null ? "" : printIdentifier,
isStdErr);
+ }
+
+ static final class PrintOptions {
+ private final String printIdentifier;
+ private final boolean stdErr;
+
+ PrintOptions(String printIdentifier, boolean stdErr) {
+ this.printIdentifier = printIdentifier;
+ this.stdErr = stdErr;
+ }
+
+ public String getPrintIdentifier() {
+ return printIdentifier;
+ }
+
+ public boolean isStdErr() {
+ return stdErr;
+ }
+ }
+
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Transformation buildVeloxSink(
Transformation<RowData> transformation, Map<String, Object> parameters) {
Transformation inputTrans = (Transformation)
transformation.getInputs().get(0);
InternalTypeInfo inputTypeInfo = (InternalTypeInfo)
inputTrans.getOutputType();
- Configuration config = (Configuration)
parameters.get(Configuration.class.getName());
- String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
- String printPath;
- if (logDir != null) {
- printPath = String.format("file://%s/%s", logDir, "taskmanager.out");
- } else {
- String flinkHomeDir = System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR);
- if (flinkHomeDir == null) {
- String flinkConfDir =
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
- if (flinkConfDir == null) {
- throw new FlinkRuntimeException(
- "Can not get flink home directory, please set FLINK_HOME.");
- }
- printPath = String.format("file://%s/../log/%s", flinkConfDir,
"taskmanager.out");
- } else {
- printPath = String.format("file://%s/log/%s", flinkHomeDir,
"taskmanager.out");
- }
- }
+
+ PrintOptions printOpts = extractPrintOptions(transformation);
+
RowType inputColumns = (RowType)
LogicalTypeConverter.toVLType(inputTypeInfo.toLogicalType());
RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
- PrintTableHandle tableHandle = new PrintTableHandle("print-table",
inputColumns, printPath);
+ PrintTableHandle tableHandle =
+ new PrintTableHandle(
+ "print-table", inputColumns, printOpts.getPrintIdentifier(),
printOpts.isStdErr());
TableWriteNode tableWriteNode =
new TableWriteNode(
PlanNodeIdGenerator.newId(),
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
index 13b195b0bd..df9da7512a 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxConnectorConfig.java
@@ -38,12 +38,14 @@ public class VeloxConnectorConfig {
"connector-from-elements",
"connector-print");
private static final String keyTaskIndex = "task_index";
+ private static final String keyParallelism = "parallelism";
private static final String keyQueryUUId = "query_uuid";
public static ConnectorConfig getConfig(RuntimeContext context) {
Map<String, String> configMap = new HashMap<>();
TaskInfo taskInfo = context.getTaskInfo();
configMap.put(keyTaskIndex,
String.valueOf(taskInfo.getIndexOfThisSubtask()));
+ configMap.put(keyParallelism,
String.valueOf(taskInfo.getNumberOfParallelSubtasks()));
configMap.put(
keyQueryUUId,
UUID.nameUUIDFromBytes(context.getJobInfo().getJobId().toHexString().getBytes())
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
index 9adae67e21..393a0064d5 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
@@ -16,7 +16,6 @@
*/
package org.apache.gluten.table.runtime.stream.common;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
@@ -29,6 +28,8 @@ import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
+import com.sun.jna.Library;
+import com.sun.jna.Native;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,23 @@ public class GlutenStreamingTestBase extends
StreamingTestBase {
private static final String EXECUTION_PLAN_PREIFX = "== Physical Execution
Plan ==";
private static final long timeoutMS = 30000;
+ // dup2 fd=1 onto a file: Velox print sink writes to std::cout, which
bypasses System.setOut and
+ // goes straight to the process's fd=1.
+ private interface CLibrary extends Library {
+ int dup(int oldfd);
+
+ int dup2(int oldfd, int newfd);
+
+ int open(String path, int flags, int mode);
+
+ int close(int fd);
+ }
+
+ private static final CLibrary C_LIB = Native.load("c", CLibrary.class);
+ private static final int O_WRONLY = 1;
+ private static final int O_CREAT = 0100;
+ private static final int O_TRUNC = 01000;
+
@BeforeAll
public static void setup() throws Exception {
LOG.info("GlutenStreamingTestBase setup");
@@ -114,42 +132,57 @@ public class GlutenStreamingTestBase extends
StreamingTestBase {
protected void runAndCheck(String query, List<String> expected) {
String printResultDirPath = System.getProperty("user.dir") + "/log/";
- tEnv().getConfig().set(CoreOptions.FLINK_LOG_DIR, printResultDirPath);
- String printResultFilePath = String.format("%s%s", printResultDirPath,
"taskmanager.out");
+ new File(printResultDirPath).mkdirs();
+ String printResultFilePath = printResultDirPath + "taskmanager.out";
File printResultFile = new File(printResultFilePath);
- boolean deleteResultFile = true;
if (printResultFile.exists()) {
- deleteResultFile = printResultFile.delete();
+ printResultFile.delete();
}
- Table table = tEnv().sqlQuery(query);
- createPrintSinkTable("printT", table.getResolvedSchema());
- String newQuery = String.format("insert into %s %s", "printT", query);
- TableResult tableResult = tEnv().executeSql(newQuery);
- assertTrue(tableResult.getJobClient().isPresent());
+
+ int savedStdout = C_LIB.dup(1);
+ int fileFd = C_LIB.open(printResultFilePath, O_WRONLY | O_CREAT | O_TRUNC,
0644);
+ if (fileFd < 0) {
+ C_LIB.close(savedStdout);
+ throw new FlinkRuntimeException("Failed to open " + printResultFilePath);
+ }
+ C_LIB.dup2(fileFd, 1);
try {
+ Table table = tEnv().sqlQuery(query);
+ createPrintSinkTable("printT", table.getResolvedSchema());
+ String newQuery = String.format("insert into %s %s", "printT", query);
+ TableResult tableResult = tEnv().executeSql(newQuery);
+ assertTrue(tableResult.getJobClient().isPresent());
JobClient jobClient = tableResult.getJobClient().get();
- if (deleteResultFile) {
- try {
- long startTime = System.currentTimeMillis();
- while (!printResultFile.exists()) {
- if (System.currentTimeMillis() - startTime > timeoutMS) {
- break;
- }
- Thread.sleep(10);
+ try {
+ long startTime = System.currentTimeMillis();
+ while (printResultFile.length() == 0) {
+ if (System.currentTimeMillis() - startTime > timeoutMS) {
+ break;
}
- long fileSize = -1L;
- startTime = System.currentTimeMillis();
- while (printResultFile.length() > fileSize) {
- if (System.currentTimeMillis() - startTime > timeoutMS) {
- break;
- }
- fileSize = printResultFile.length();
- Thread.sleep(3000);
+ Thread.sleep(10);
+ }
+ long fileSize = -1L;
+ startTime = System.currentTimeMillis();
+ while (printResultFile.length() > fileSize) {
+ if (System.currentTimeMillis() - startTime > timeoutMS) {
+ break;
}
- } finally {
- jobClient.cancel();
+ fileSize = printResultFile.length();
+ Thread.sleep(3000);
}
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new FlinkRuntimeException(ie);
+ } finally {
+ jobClient.cancel();
}
+ } finally {
+ C_LIB.dup2(savedStdout, 1);
+ C_LIB.close(fileFd);
+ C_LIB.close(savedStdout);
+ }
+
+ try {
List<String> result = new ArrayList<>();
try (FileReader fr = new FileReader(printResultFile);
BufferedReader br = new BufferedReader(fr)) {
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java
new file mode 100644
index 0000000000..46c2e8091c
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/velox/PrintSinkFactoryTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Constructor;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class PrintSinkFactoryTest {
+
+ private static final String ROWDATA_PRINT_FUNCTION_CN =
+
"org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction";
+
+ @SuppressWarnings("unchecked")
+ private static SinkFunction<RowData> newRowDataPrintFunction(String
identifier, boolean isStdErr)
+ throws Exception {
+ Class<?> cls = Class.forName(ROWDATA_PRINT_FUNCTION_CN);
+ Constructor<?> ctor =
+ cls.getDeclaredConstructor(
+
org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter.class,
+ String.class,
+ boolean.class);
+ ctor.setAccessible(true);
+ return (SinkFunction<RowData>) ctor.newInstance(null, identifier,
isStdErr);
+ }
+
+ private static LegacySinkTransformation<RowData> buildSinkTransformation(
+ SinkFunction<RowData> userFunction) {
+ SinkOperator sinkOp = new SinkOperator(userFunction, -1);
+ SimpleOperatorFactory<Object> factory = SimpleOperatorFactory.of(sinkOp);
+ Transformation<RowData> input = new StubTransformation();
+ return new LegacySinkTransformation<>(input, "print-sink", factory, 1);
+ }
+
+ private static final class StubTransformation extends
Transformation<RowData> {
+ StubTransformation() {
+ super("stub", InternalTypeInfo.of(RowType.of(new IntType())), 1);
+ }
+
+ @Override
+ public List<Transformation<?>> getInputs() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected List<Transformation<?>> getTransitivePredecessorsInternal() {
+ return Collections.emptyList();
+ }
+ }
+
+ private static final class OtherSinkFunction extends
RichSinkFunction<RowData> {}
+
+ @Test
+ void testMatchAcceptsRowDataPrintFunction() throws Exception {
+ PrintSinkFactory factory = new PrintSinkFactory();
+
assertTrue(factory.match(buildSinkTransformation(newRowDataPrintFunction("foo",
false))));
+ }
+
+ @Test
+ void testMatchRejectsNonPrintSinkFunction() {
+ PrintSinkFactory factory = new PrintSinkFactory();
+ assertFalse(factory.match(buildSinkTransformation(new
OtherSinkFunction())));
+ }
+
+ @Test
+ void testMatchRejectsNonLegacySinkTransformation() {
+ PrintSinkFactory factory = new PrintSinkFactory();
+ assertFalse(factory.match(new StubTransformation()));
+ }
+
+ @Test
+ void testExtractPrintOptionsReadsIdentifierAndStderr() throws Exception {
+ LegacySinkTransformation<RowData> tx =
+ buildSinkTransformation(newRowDataPrintFunction("foo", true));
+ PrintSinkFactory.PrintOptions opts =
PrintSinkFactory.extractPrintOptions(tx);
+ assertEquals("foo", opts.getPrintIdentifier());
+ assertTrue(opts.isStdErr());
+ }
+
+ @Test
+ void testExtractPrintOptionsDefaultsWhenUnset() throws Exception {
+ LegacySinkTransformation<RowData> tx =
+ buildSinkTransformation(newRowDataPrintFunction(null, false));
+ PrintSinkFactory.PrintOptions opts =
PrintSinkFactory.extractPrintOptions(tx);
+ assertEquals("", opts.getPrintIdentifier());
+ assertFalse(opts.isStdErr());
+ }
+}
diff --git a/gluten-flink/ut/src/test/resources/nexmark/q12.sql
b/gluten-flink/ut/src/test/resources/nexmark/q12.sql
deleted file mode 100755
index f2cda4f463..0000000000
--- a/gluten-flink/ut/src/test/resources/nexmark/q12.sql
+++ /dev/null
@@ -1,20 +0,0 @@
-CREATE TABLE nexmark_q12 (
- bidder BIGINT,
- bid_count BIGINT,
- starttime TIMESTAMP(3),
- endtime TIMESTAMP(3)
-) WITH (
- 'connector' = 'blackhole'
-);
-
-CREATE VIEW B AS SELECT *, PROCTIME() as p_time FROM bid;
-
-INSERT INTO nexmark_q12
-SELECT
- bidder,
- count(*) as bid_count,
- window_start AS starttime,
- window_end AS endtime
-FROM TABLE(
- TUMBLE(TABLE B, DESCRIPTOR(p_time), INTERVAL '10' SECOND))
-GROUP BY bidder, window_start, window_end;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]