This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f8f1f509ff [GLUTEN-9540][FLINK] Add UT for join operator (#10180)
f8f1f509ff is described below
commit f8f1f509ff180d04a5dfa18270e7ae0e045f7dfd
Author: yuanhang ma <[email protected]>
AuthorDate: Wed Jul 23 15:39:24 2025 +0800
[GLUTEN-9540][FLINK] Add UT for join operator (#10180)
---
gluten-flink/ut/pom.xml | 7 +
.../operators/GlutenStreamJoinOperatorTest.java | 204 +++++++++++++
.../GlutenStreamJoinOperatorTestBase.java | 334 +++++++++++++++++++++
3 files changed, 545 insertions(+)
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
index 4e52aa13f3..fa053f85d9 100644
--- a/gluten-flink/ut/pom.xml
+++ b/gluten-flink/ut/pom.xml
@@ -135,6 +135,13 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTest.java
new file mode 100644
index 0000000000..06b5c7a844
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.streaming.api.operators;
+
+import org.apache.gluten.table.runtime.operators.GlutenVectorTwoInputOperator;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkRexBuilder;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.data.StringData.fromString;
+
+public class GlutenStreamJoinOperatorTest extends
GlutenStreamJoinOperatorTestBase {
+
+ private static FlinkTypeFactory typeFactory;
+ private static RexBuilder rexBuilder;
+ private static List<RowData> leftTestData;
+ private static List<RowData> rightTestData;
+
+ @BeforeAll
+ public static void setupTestData() {
+ typeFactory =
+ new FlinkTypeFactory(
+ Thread.currentThread().getContextClassLoader(),
FlinkTypeSystem.INSTANCE);
+ rexBuilder = new FlinkRexBuilder(typeFactory);
+
+ leftTestData =
+ Arrays.asList(
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#1"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464")),
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#2"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464")),
+ GenericRowData.of(
+ fromString("Ord#2"),
+ fromString("LineOrd#3"),
+ fromString("68 Manor Station Street, Honolulu, HI 96815")));
+
+ rightTestData =
+ Arrays.asList(
+ GenericRowData.of(fromString("LineOrd#2"), fromString("AIR")),
+ GenericRowData.of(fromString("LineOrd#3"), fromString("TRUCK")),
+ GenericRowData.of(fromString("LineOrd#4"), fromString("SHIP")));
+ }
+
+ @Test
+ public void testInnerJoin() throws Exception {
+ GlutenVectorTwoInputOperator operator =
createGlutenJoinOperator(FlinkJoinType.INNER);
+
+ List<RowData> expectedOutput =
+ Arrays.asList(
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#2"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+ fromString("LineOrd#2"),
+ fromString("AIR")),
+ GenericRowData.of(
+ fromString("Ord#2"),
+ fromString("LineOrd#3"),
+ fromString("68 Manor Station Street, Honolulu, HI 96815"),
+ fromString("LineOrd#3"),
+ fromString("TRUCK")));
+ executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+ }
+
+ @Test
+ @Disabled
+ public void testLeftJoin() throws Exception {
+ GlutenVectorTwoInputOperator operator =
createGlutenJoinOperator(FlinkJoinType.LEFT);
+
+ List<RowData> expectedOutput =
+ Arrays.asList(
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#1"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+ null,
+ null),
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#2"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+ fromString("LineOrd#2"),
+ fromString("AIR")),
+ GenericRowData.of(
+ fromString("Ord#2"),
+ fromString("LineOrd#3"),
+ fromString("68 Manor Station Street, Honolulu, HI 96815"),
+ fromString("LineOrd#3"),
+ fromString("TRUCK")));
+ executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+ }
+
+ @Test
+ @Disabled
+ public void testRightJoin() throws Exception {
+ GlutenVectorTwoInputOperator operator =
createGlutenJoinOperator(FlinkJoinType.RIGHT);
+
+ List<RowData> expectedOutput =
+ Arrays.asList(
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#2"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+ fromString("LineOrd#2"),
+ fromString("AIR")),
+ GenericRowData.of(
+ fromString("Ord#2"),
+ fromString("LineOrd#3"),
+ fromString("68 Manor Station Street, Honolulu, HI 96815"),
+ fromString("LineOrd#3"),
+ fromString("TRUCK")),
+ GenericRowData.of(null, null, null, fromString("LineOrd#4"),
fromString("SHIP")));
+
+ executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+ }
+
+ @Test
+ @Disabled
+ public void testFullOuterJoin() throws Exception {
+ GlutenVectorTwoInputOperator operator =
createGlutenJoinOperator(FlinkJoinType.FULL);
+
+ List<RowData> expectedOutput =
+ Arrays.asList(
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#1"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+ null,
+ null),
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#2"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+ fromString("LineOrd#2"),
+ fromString("AIR")),
+ GenericRowData.of(
+ fromString("Ord#2"),
+ fromString("LineOrd#3"),
+ fromString("68 Manor Station Street, Honolulu, HI 96815"),
+ fromString("LineOrd#3"),
+ fromString("TRUCK")),
+ GenericRowData.of(null, null, null, fromString("LineOrd#4"),
fromString("SHIP")));
+
+ executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+ }
+
+ @Test
+ public void testInnerJoinWithNonEquiCondition() throws Exception {
+ RexNode nonEquiCondition = createNonEquiCondition();
+ GlutenVectorTwoInputOperator operator =
+ createGlutenJoinOperator(FlinkJoinType.INNER, nonEquiCondition);
+
+ List<RowData> expectedOutput =
+ Arrays.asList(
+ GenericRowData.of(
+ fromString("Ord#1"),
+ fromString("LineOrd#2"),
+ fromString("3 Bellevue Drive, Pottstown, PA 19464"),
+ fromString("LineOrd#2"),
+ fromString("AIR")));
+
+ executeJoinTest(operator, leftTestData, rightTestData, expectedOutput);
+ }
+
+ private RexNode createNonEquiCondition() {
+ RexNode leftField =
rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.CHAR, 20), 0);
+ RexNode rightField =
+ rexBuilder.makeInputRef(typeFactory.createSqlType(SqlTypeName.CHAR,
10), 4);
+ return rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, leftField,
rightField);
+ }
+}
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
new file mode 100644
index 0000000000..3b0caba154
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamJoinOperatorTestBase.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.streaming.api.operators;
+
+import org.apache.gluten.rexnode.RexConversionContext;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.table.runtime.operators.GlutenVectorTwoInputOperator;
+import org.apache.gluten.table.runtime.stream.common.Velox4jEnvironment;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
+
+import io.github.zhztheplayer.velox4j.Velox4j;
+import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
+import io.github.zhztheplayer.velox4j.data.RowVector;
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.join.JoinType;
+import io.github.zhztheplayer.velox4j.memory.AllocationListener;
+import io.github.zhztheplayer.velox4j.memory.MemoryManager;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.NestedLoopJoinNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.StreamJoinNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.session.Session;
+import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
+import io.github.zhztheplayer.velox4j.type.BooleanType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
+import
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperatorTestBase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.calcite.rex.RexNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.gluten.streaming.api.operators.utils.RowDataTestUtils.checkEquals;
+
+public abstract class GlutenStreamJoinOperatorTestBase extends
StreamingJoinOperatorTestBase {
+
+ protected static RowType leftVeloxType;
+ protected static RowType rightVeloxType;
+ protected static RowType outputVeloxType;
+ protected static org.apache.flink.table.types.logical.RowType outputRowType;
+
+ protected MemoryManager sharedMemoryManager;
+ protected Session sharedSession;
+ protected BufferAllocator sharedAllocator;
+
+ @BeforeAll
+ public static void setupGlutenEnvironment() {
+ Velox4jEnvironment.initializeOnce();
+ setGlutenTypes();
+ }
+
+ private static void setGlutenTypes() {
+
+ InternalTypeInfo<RowData> leftTypeInfo =
+ InternalTypeInfo.of(
+ org.apache.flink.table.types.logical.RowType.of(
+ new LogicalType[] {
+ new CharType(false, 20), new CharType(false, 20),
VarCharType.STRING_TYPE
+ },
+ new String[] {"order_id", "line_order_id",
"shipping_address"}));
+ InternalTypeInfo<RowData> rightTypeInfo =
+ InternalTypeInfo.of(
+ org.apache.flink.table.types.logical.RowType.of(
+ new LogicalType[] {new CharType(false, 20), new CharType(true,
10)},
+ new String[] {"line_order_id0", "line_order_ship_mode"}));
+
+ leftVeloxType = (RowType)
LogicalTypeConverter.toVLType(leftTypeInfo.toRowType());
+ rightVeloxType = (RowType)
LogicalTypeConverter.toVLType(rightTypeInfo.toRowType());
+
+ outputRowType =
+ org.apache.flink.table.types.logical.RowType.of(
+ Stream.concat(
+ leftTypeInfo.toRowType().getChildren().stream(),
+ rightTypeInfo.toRowType().getChildren().stream())
+ .toArray(LogicalType[]::new),
+ Stream.concat(
+ leftTypeInfo.toRowType().getFieldNames().stream(),
+ rightTypeInfo.toRowType().getFieldNames().stream())
+ .toArray(String[]::new));
+
+ outputVeloxType = (RowType) LogicalTypeConverter.toVLType(outputRowType);
+ }
+
+ @Override
+ @BeforeEach
+ public void beforeEach(TestInfo testInfo) {
+ sharedMemoryManager = MemoryManager.create(AllocationListener.NOOP);
+ sharedSession = Velox4j.newSession(sharedMemoryManager);
+ sharedAllocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @Override
+ @AfterEach
+ public void afterEach() throws Exception {
+ if (sharedAllocator != null) {
+ sharedAllocator.close();
+ sharedAllocator = null;
+ }
+ if (sharedSession != null) {
+ sharedSession.close();
+ sharedSession = null;
+ }
+ if (sharedMemoryManager != null) {
+ sharedMemoryManager.close();
+ sharedMemoryManager = null;
+ }
+ }
+
+ @Override
+ protected final AbstractStreamingJoinOperator createJoinOperator(TestInfo
testInfo) {
+ throw new UnsupportedOperationException("Use createGlutenJoinOperator
instead");
+ }
+
+ @Override
+ protected org.apache.flink.table.types.logical.RowType getOutputType() {
+ return outputRowType;
+ }
+
+ protected GlutenVectorTwoInputOperator
createGlutenJoinOperator(FlinkJoinType joinType) {
+ return createGlutenJoinOperator(joinType, null);
+ }
+
+ protected GlutenVectorTwoInputOperator createGlutenJoinOperator(
+ FlinkJoinType joinType, RexNode nonEquiCondition) {
+ JoinType veloxJoinType = Utils.toVLJoinType(joinType);
+
+ int[] leftJoinKeys = {1};
+ int[] rightJoinKeys = {0};
+
+ List<FieldAccessTypedExpr> leftKeys =
+ Utils.analyzeJoinKeys(leftVeloxType, leftJoinKeys, List.of());
+ List<FieldAccessTypedExpr> rightKeys =
+ Utils.analyzeJoinKeys(rightVeloxType, rightJoinKeys, List.of());
+
+ TypedExpr joinCondition = Utils.generateJoinEqualCondition(leftKeys,
rightKeys);
+
+ if (nonEquiCondition != null) {
+ RexConversionContext conversionContext = new
RexConversionContext(outputVeloxType.getNames());
+ TypedExpr nonEqual = RexNodeConverter.toTypedExpr(nonEquiCondition,
conversionContext);
+ joinCondition = new CallTypedExpr(new BooleanType(),
List.of(joinCondition, nonEqual), "and");
+ }
+
+ PlanNode leftInput =
+ new TableScanNode(
+ PlanNodeIdGenerator.newId(),
+ leftVeloxType,
+ new ExternalStreamTableHandle("connector-external-stream"),
+ List.of());
+
+ PlanNode rightInput =
+ new TableScanNode(
+ PlanNodeIdGenerator.newId(),
+ rightVeloxType,
+ new ExternalStreamTableHandle("connector-external-stream"),
+ List.of());
+
+ NestedLoopJoinNode leftNode =
+ new NestedLoopJoinNode(
+ PlanNodeIdGenerator.newId(),
+ veloxJoinType,
+ joinCondition,
+ new EmptyNode(leftVeloxType),
+ new EmptyNode(rightVeloxType),
+ outputVeloxType);
+
+ NestedLoopJoinNode rightNode =
+ new NestedLoopJoinNode(
+ PlanNodeIdGenerator.newId(),
+ veloxJoinType,
+ joinCondition,
+ new EmptyNode(rightVeloxType),
+ new EmptyNode(leftVeloxType),
+ outputVeloxType);
+
+ PlanNode join =
+ new StreamJoinNode(
+ PlanNodeIdGenerator.newId(),
+ leftInput,
+ rightInput,
+ leftNode,
+ rightNode,
+ outputVeloxType);
+
+ return new GlutenVectorTwoInputOperator(
+ new StatefulPlanNode(join.getId(), join),
+ leftInput.getId(),
+ rightInput.getId(),
+ leftVeloxType,
+ rightVeloxType,
+ Map.of(join.getId(), outputVeloxType));
+ }
+
+ protected void processTestData(
+ KeyedTwoInputStreamOperatorTestHarness<RowData, StatefulRecord,
StatefulRecord, RowData>
+ harness,
+ List<RowData> leftData,
+ List<RowData> rightData)
+ throws Exception {
+ long timestamp = 0L;
+ for (RowData row : leftData) {
+ StatefulRecord record = convertToStatefulRecord(row, leftVeloxType);
+ harness.processElement1(new StreamRecord<>(record, timestamp++));
+ }
+
+ timestamp = 0L;
+ for (RowData row : rightData) {
+ StatefulRecord record = convertToStatefulRecord(row, rightVeloxType);
+ harness.processElement2(new StreamRecord<>(record, timestamp++));
+ }
+ }
+
+ protected List<RowData> extractOutputFromHarness(
+ KeyedTwoInputStreamOperatorTestHarness<RowData, StatefulRecord,
StatefulRecord, RowData>
+ harness) {
+ Queue<Object> outputQueue = harness.getOutput();
+ return outputQueue.stream()
+ .filter(record -> record instanceof StreamRecord)
+ .map(record -> ((StreamRecord<RowData>) record).getValue())
+ .collect(Collectors.toList());
+ }
+
+ private StatefulRecord convertToStatefulRecord(RowData rowData, RowType
rowType) {
+ try {
+ RowVector rowVector =
+ FlinkRowToVLVectorConvertor.fromRowData(rowData, sharedAllocator,
sharedSession, rowType);
+
+ StatefulRecord record = new StatefulRecord(null, 0, 0, false, -1);
+ record.setRowVector(rowVector);
+
+ return record;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to convert RowData to
StatefulRecord", e);
+ }
+ }
+
+ protected void executeJoinTest(
+ GlutenVectorTwoInputOperator operator,
+ List<RowData> leftData,
+ List<RowData> rightData,
+ List<RowData> expectedOutput)
+ throws Exception {
+ KeyedTwoInputStreamOperatorTestHarness<RowData, StatefulRecord,
StatefulRecord, RowData>
+ harness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new GlutenRowDataKeySelector(leftKeySelector, leftVeloxType),
+ new GlutenRowDataKeySelector(rightKeySelector, rightVeloxType),
+ joinKeyTypeInfo);
+
+ try {
+ harness.setup();
+ harness.open();
+
+ processTestData(harness, leftData, rightData);
+ List<RowData> actualOutput = extractOutputFromHarness(harness);
+ checkEquals(actualOutput, expectedOutput, outputRowType.getChildren());
+ } finally {
+ harness.close();
+ }
+ }
+
+ private static class GlutenRowDataKeySelector
+ implements
org.apache.flink.api.java.functions.KeySelector<StatefulRecord, RowData> {
+ private final RowDataKeySelector delegate;
+ private final RowType rowType;
+
+ private transient BufferAllocator allocator;
+
+ public GlutenRowDataKeySelector(RowDataKeySelector delegate, RowType
rowType) {
+ this.delegate = delegate;
+ this.rowType = rowType;
+ }
+
+ private BufferAllocator getAllocator() {
+ if (allocator == null) {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+ return allocator;
+ }
+
+ @Override
+ public RowData getKey(StatefulRecord record) {
+ try {
+ List<RowData> rowDataList =
+ FlinkRowToVLVectorConvertor.toRowData(record.getRowVector(),
getAllocator(), rowType);
+
+ return delegate.getKey(rowDataList.get(0));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to extract key from
StatefulRecord", e);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]