This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f8f1f509ff [GLUTEN-9540][FLINK] Add UT for join operator (#10180)
f8f1f509ff is described below

commit f8f1f509ff180d04a5dfa18270e7ae0e045f7dfd
Author: yuanhang ma <[email protected]>
AuthorDate: Wed Jul 23 15:39:24 2025 +0800

    [GLUTEN-9540][FLINK] Add UT for join operator (#10180)
---
 gluten-flink/ut/pom.xml                            |   7 +
 .../operators/GlutenStreamJoinOperatorTest.java    | 204 +++++++++++++
 .../GlutenStreamJoinOperatorTestBase.java          | 334 +++++++++++++++++++++
 3 files changed, 545 insertions(+)

diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index 4e52aa13f3..fa053f85d9 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -135,6 +135,13 @@
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTest.java
new file mode 100644
index 0000000000..06b5c7a844
--- /dev/null
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.streaming.api.operators;
+
+import org.apache.gluten.table.runtime.operators.GlutenVectorTwoInputOperator;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkRexBuilder;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.data.StringData.fromString;
+
+public class GlutenStreamJoinOperatorTest extends 
GlutenStreamJoinOperatorTestBase {
+
+  private static FlinkTypeFactory typeFactory;
+  private static RexBuilder rexBuilder;
+  private static List<RowData> leftTestData;
+  private static List<RowData> rightTestData;
+
+  @BeforeAll
+  public static void setupTestData() {
+    typeFactory =
+        new FlinkTypeFactory(
+            Thread.currentThread().getContextClassLoader(), 
FlinkTypeSystem.INSTANCE);
+    rexBuilder = new FlinkRexBuilder(typeFactory);
+
+    leftTestData =
+        Arrays.asList(
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#1"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464")),
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#2"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464")),
+            GenericRowData.of(
+                fromString("Ord#2"),
+                fromString("LineOrd#3"),
+                fromString("68 Manor Station Street, Honolulu, HI 96815")));
+
+    rightTestData =
+        Arrays.asList(
+            GenericRowData.of(fromString("LineOrd#2"), fromString("AIR")),
+            GenericRowData.of(fromString("LineOrd#3"), fromString("TRUCK")),
+            GenericRowData.of(fromString("LineOrd#4"), fromString("SHIP")));
+  }
+
+  @Test
+  public void testInnerJoin() throws Exception {
+    GlutenVectorTwoInputOperator operator = 
createGlutenJoinOperator(FlinkJoinType.INNER);
+
+    List<RowData> expectedOutput =
+        Arrays.asList(
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#2"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+                fromString("LineOrd#2"),
+                fromString("AIR")),
+            GenericRowData.of(
+                fromString("Ord#2"),
+                fromString("LineOrd#3"),
+                fromString("68 Manor Station Street, Honolulu, HI 96815"),
+                fromString("LineOrd#3"),
+                fromString("TRUCK")));
+    executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+  }
+
+  @Test
+  @Disabled
+  public void testLeftJoin() throws Exception {
+    GlutenVectorTwoInputOperator operator = 
createGlutenJoinOperator(FlinkJoinType.LEFT);
+
+    List<RowData> expectedOutput =
+        Arrays.asList(
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#1"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+                null,
+                null),
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#2"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+                fromString("LineOrd#2"),
+                fromString("AIR")),
+            GenericRowData.of(
+                fromString("Ord#2"),
+                fromString("LineOrd#3"),
+                fromString("68 Manor Station Street, Honolulu, HI 96815"),
+                fromString("LineOrd#3"),
+                fromString("TRUCK")));
+    executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+  }
+
+  @Test
+  @Disabled
+  public void testRightJoin() throws Exception {
+    GlutenVectorTwoInputOperator operator = 
createGlutenJoinOperator(FlinkJoinType.RIGHT);
+
+    List<RowData> expectedOutput =
+        Arrays.asList(
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#2"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+                fromString("LineOrd#2"),
+                fromString("AIR")),
+            GenericRowData.of(
+                fromString("Ord#2"),
+                fromString("LineOrd#3"),
+                fromString("68 Manor Station Street, Honolulu, HI 96815"),
+                fromString("LineOrd#3"),
+                fromString("TRUCK")),
+            GenericRowData.of(null, null, null, fromString("LineOrd#4"), 
fromString("SHIP")));
+
+    executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+  }
+
+  @Test
+  @Disabled
+  public void testFullOuterJoin() throws Exception {
+    GlutenVectorTwoInputOperator operator = 
createGlutenJoinOperator(FlinkJoinType.FULL);
+
+    List<RowData> expectedOutput =
+        Arrays.asList(
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#1"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+                null,
+                null),
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#2"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+                fromString("LineOrd#2"),
+                fromString("AIR")),
+            GenericRowData.of(
+                fromString("Ord#2"),
+                fromString("LineOrd#3"),
+                fromString("68 Manor Station Street, Honolulu, HI 96815"),
+                fromString("LineOrd#3"),
+                fromString("TRUCK")),
+            GenericRowData.of(null, null, null, fromString("LineOrd#4"), 
fromString("SHIP")));
+
+    executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+  }
+
+  @Test
+  public void testInnerJoinWithNonEquiCondition() throws Exception {
+    RexNode nonEquiCondition = createNonEquiCondition();
+    GlutenVectorTwoInputOperator operator =
+        createGlutenJoinOperator(FlinkJoinType.INNER, nonEquiCondition);
+
+    List<RowData> expectedOutput =
+        Arrays.asList(
+            GenericRowData.of(
+                fromString("Ord#1"),
+                fromString("LineOrd#2"),
+                fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+                fromString("LineOrd#2"),
+                fromString("AIR")));
+
+    executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+  }
+
+  private RexNode createNonEquiCondition() {
+    RexNode leftField = 
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.CHAR, 20), 0);
+    RexNode rightField =
+        rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.CHAR, 
10), 4);
+    return rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, leftField, 
rightField);
+  }
+}
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
new file mode 100644
index 0000000000..3b0caba154
--- /dev/null
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.streaming.api.operators;
+
+import org.apache.gluten.rexnode.RexConversionContext;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.table.runtime.operators.GlutenVectorTwoInputOperator;
+import org.apache.gluten.table.runtime.stream.common.Velox4jEnvironment;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
+
+import io.github.zhztheplayer.velox4j.Velox4j;
+import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
+import io.github.zhztheplayer.velox4j.data.RowVector;
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.join.JoinType;
+import io.github.zhztheplayer.velox4j.memory.AllocationListener;
+import io.github.zhztheplayer.velox4j.memory.MemoryManager;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.NestedLoopJoinNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.StreamJoinNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.session.Session;
+import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
+import io.github.zhztheplayer.velox4j.type.BooleanType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import 
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
+import 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperatorTestBase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.calcite.rex.RexNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.gluten.streaming.api.operators.utils.RowDataTestUtils.checkEquals;
+
+public abstract class GlutenStreamJoinOperatorTestBase extends 
StreamingJoinOperatorTestBase {
+
+  protected static RowType leftVeloxType;
+  protected static RowType rightVeloxType;
+  protected static RowType outputVeloxType;
+  protected static org.apache.flink.table.types.logical.RowType outputRowType;
+
+  protected MemoryManager sharedMemoryManager;
+  protected Session sharedSession;
+  protected BufferAllocator sharedAllocator;
+
+  @BeforeAll
+  public static void setupGlutenEnvironment() {
+    Velox4jEnvironment.initializeOnce();
+    setGlutenTypes();
+  }
+
+  private static void setGlutenTypes() {
+
+    InternalTypeInfo<RowData> leftTypeInfo =
+        InternalTypeInfo.of(
+            org.apache.flink.table.types.logical.RowType.of(
+                new LogicalType[] {
+                  new CharType(false, 20), new CharType(false, 20), 
VarCharType.STRING_TYPE
+                },
+                new String[] {"order_id", "line_order_id", 
"shipping_address"}));
+    InternalTypeInfo<RowData> rightTypeInfo =
+        InternalTypeInfo.of(
+            org.apache.flink.table.types.logical.RowType.of(
+                new LogicalType[] {new CharType(false, 20), new CharType(true, 
10)},
+                new String[] {"line_order_id0", "line_order_ship_mode"}));
+
+    leftVeloxType = (RowType) 
LogicalTypeConverter.toVLType(leftTypeInfo.toRowType());
+    rightVeloxType = (RowType) 
LogicalTypeConverter.toVLType(rightTypeInfo.toRowType());
+
+    outputRowType =
+        org.apache.flink.table.types.logical.RowType.of(
+            Stream.concat(
+                    leftTypeInfo.toRowType().getChildren().stream(),
+                    rightTypeInfo.toRowType().getChildren().stream())
+                .toArray(LogicalType[]::new),
+            Stream.concat(
+                    leftTypeInfo.toRowType().getFieldNames().stream(),
+                    rightTypeInfo.toRowType().getFieldNames().stream())
+                .toArray(String[]::new));
+
+    outputVeloxType = (RowType) LogicalTypeConverter.toVLType(outputRowType);
+  }
+
+  @Override
+  @BeforeEach
+  public void beforeEach(TestInfo testInfo) {
+    sharedMemoryManager = MemoryManager.create(AllocationListener.NOOP);
+    sharedSession = Velox4j.newSession(sharedMemoryManager);
+    sharedAllocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @Override
+  @AfterEach
+  public void afterEach() throws Exception {
+    if (sharedAllocator != null) {
+      sharedAllocator.close();
+      sharedAllocator = null;
+    }
+    if (sharedSession != null) {
+      sharedSession.close();
+      sharedSession = null;
+    }
+    if (sharedMemoryManager != null) {
+      sharedMemoryManager.close();
+      sharedMemoryManager = null;
+    }
+  }
+
+  @Override
+  protected final AbstractStreamingJoinOperator createJoinOperator(TestInfo 
testInfo) {
+    throw new UnsupportedOperationException("Use createGlutenJoinOperator 
instead");
+  }
+
+  @Override
+  protected org.apache.flink.table.types.logical.RowType getOutputType() {
+    return outputRowType;
+  }
+
+  protected GlutenVectorTwoInputOperator 
createGlutenJoinOperator(FlinkJoinType joinType) {
+    return createGlutenJoinOperator(joinType, null);
+  }
+
+  protected GlutenVectorTwoInputOperator createGlutenJoinOperator(
+      FlinkJoinType joinType, RexNode nonEquiCondition) {
+    JoinType veloxJoinType = Utils.toVLJoinType(joinType);
+
+    int[] leftJoinKeys = {1};
+    int[] rightJoinKeys = {0};
+
+    List<FieldAccessTypedExpr> leftKeys =
+        Utils.analyzeJoinKeys(leftVeloxType, leftJoinKeys, List.of());
+    List<FieldAccessTypedExpr> rightKeys =
+        Utils.analyzeJoinKeys(rightVeloxType, rightJoinKeys, List.of());
+
+    TypedExpr joinCondition = Utils.generateJoinEqualCondition(leftKeys, 
rightKeys);
+
+    if (nonEquiCondition != null) {
+      RexConversionContext conversionContext = new 
RexConversionContext(outputVeloxType.getNames());
+      TypedExpr nonEqual = RexNodeConverter.toTypedExpr(nonEquiCondition, 
conversionContext);
+      joinCondition = new CallTypedExpr(new BooleanType(), 
List.of(joinCondition, nonEqual), "and");
+    }
+
+    PlanNode leftInput =
+        new TableScanNode(
+            PlanNodeIdGenerator.newId(),
+            leftVeloxType,
+            new ExternalStreamTableHandle("connector-external-stream"),
+            List.of());
+
+    PlanNode rightInput =
+        new TableScanNode(
+            PlanNodeIdGenerator.newId(),
+            rightVeloxType,
+            new ExternalStreamTableHandle("connector-external-stream"),
+            List.of());
+
+    NestedLoopJoinNode leftNode =
+        new NestedLoopJoinNode(
+            PlanNodeIdGenerator.newId(),
+            veloxJoinType,
+            joinCondition,
+            new EmptyNode(leftVeloxType),
+            new EmptyNode(rightVeloxType),
+            outputVeloxType);
+
+    NestedLoopJoinNode rightNode =
+        new NestedLoopJoinNode(
+            PlanNodeIdGenerator.newId(),
+            veloxJoinType,
+            joinCondition,
+            new EmptyNode(rightVeloxType),
+            new EmptyNode(leftVeloxType),
+            outputVeloxType);
+
+    PlanNode join =
+        new StreamJoinNode(
+            PlanNodeIdGenerator.newId(),
+            leftInput,
+            rightInput,
+            leftNode,
+            rightNode,
+            outputVeloxType);
+
+    return new GlutenVectorTwoInputOperator(
+        new StatefulPlanNode(join.getId(), join),
+        leftInput.getId(),
+        rightInput.getId(),
+        leftVeloxType,
+        rightVeloxType,
+        Map.of(join.getId(), outputVeloxType));
+  }
+
+  protected void processTestData(
+      KeyedTwoInputStreamOperatorTestHarness<RowData, StatefulRecord, 
StatefulRecord, RowData>
+          harness,
+      List<RowData> leftData,
+      List<RowData> rightData)
+      throws Exception {
+    long timestamp = 0L;
+    for (RowData row : leftData) {
+      StatefulRecord record = convertToStatefulRecord(row, leftVeloxType);
+      harness.processElement1(new StreamRecord<>(record, timestamp++));
+    }
+
+    timestamp = 0L;
+    for (RowData row : rightData) {
+      StatefulRecord record = convertToStatefulRecord(row, rightVeloxType);
+      harness.processElement2(new StreamRecord<>(record, timestamp++));
+    }
+  }
+
+  protected List<RowData> extractOutputFromHarness(
+      KeyedTwoInputStreamOperatorTestHarness<RowData, StatefulRecord, 
StatefulRecord, RowData>
+          harness) {
+    Queue<Object> outputQueue = harness.getOutput();
+    return outputQueue.stream()
+        .filter(record -> record instanceof StreamRecord)
+        .map(record -> ((StreamRecord<RowData>) record).getValue())
+        .collect(Collectors.toList());
+  }
+
+  private StatefulRecord convertToStatefulRecord(RowData rowData, RowType 
rowType) {
+    try {
+      RowVector rowVector =
+          FlinkRowToVLVectorConvertor.fromRowData(rowData, sharedAllocator, 
sharedSession, rowType);
+
+      StatefulRecord record = new StatefulRecord(null, 0, 0, false, -1);
+      record.setRowVector(rowVector);
+
+      return record;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to convert RowData to 
StatefulRecord", e);
+    }
+  }
+
+  protected void executeJoinTest(
+      GlutenVectorTwoInputOperator operator,
+      List<RowData> leftData,
+      List<RowData> rightData,
+      List<RowData> expectedOutput)
+      throws Exception {
+    KeyedTwoInputStreamOperatorTestHarness<RowData, StatefulRecord, 
StatefulRecord, RowData>
+        harness =
+            new KeyedTwoInputStreamOperatorTestHarness<>(
+                operator,
+                new GlutenRowDataKeySelector(leftKeySelector, leftVeloxType),
+                new GlutenRowDataKeySelector(rightKeySelector, rightVeloxType),
+                joinKeyTypeInfo);
+
+    try {
+      harness.setup();
+      harness.open();
+
+      processTestData(harness, leftData, rightData);
+      List<RowData> actualOutput = extractOutputFromHarness(harness);
+      checkEquals(actualOutput, expectedOutput, outputRowType.getChildren());
+    } finally {
+      harness.close();
+    }
+  }
+
+  private static class GlutenRowDataKeySelector
+      implements 
org.apache.flink.api.java.functions.KeySelector<StatefulRecord, RowData> {
+    private final RowDataKeySelector delegate;
+    private final RowType rowType;
+
+    private transient BufferAllocator allocator;
+
+    public GlutenRowDataKeySelector(RowDataKeySelector delegate, RowType 
rowType) {
+      this.delegate = delegate;
+      this.rowType = rowType;
+    }
+
+    private BufferAllocator getAllocator() {
+      if (allocator == null) {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+      }
+      return allocator;
+    }
+
+    @Override
+    public RowData getKey(StatefulRecord record) {
+      try {
+        List<RowData> rowDataList =
+            FlinkRowToVLVectorConvertor.toRowData(record.getRowVector(), 
getAllocator(), rowType);
+
+        return delegate.getKey(rowDataList.get(0));
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to extract key from 
StatefulRecord", e);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to