This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7da2d1a455 [GLUTEN-10131][FLINK] Fix duplicate release of RowVector
issue in `GlutenSingleInputOperator` (#10132)
7da2d1a455 is described below
commit 7da2d1a4557b34e66c55eec032729498bc112a9d
Author: yuanhang ma <[email protected]>
AuthorDate: Mon Jul 7 15:44:43 2025 +0800
[GLUTEN-10131][FLINK] Fix duplicate release of RowVector issue in
`GlutenSingleInputOperator` (#10132)
* Fix duplicate release of RowVector issue in `GlutenSingleInputOperator`
---
.../operators/GlutenSingleInputOperator.java | 34 +++++++---------------
.../runtime/stream/custom/ScalarFunctionsTest.java | 2 --
.../table/runtime/stream/custom/ScanTest.java | 2 +-
3 files changed, 12 insertions(+), 26 deletions(-)
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
index 53998f9837..0bec1bd886 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
@@ -110,34 +110,22 @@ public class GlutenSingleInputOperator extends
TableStreamOperator<RowData>
@Override
public void processElement(StreamRecord<RowData> element) {
- RowVector inRv = null;
- RowVector outRv = null;
- try {
- inRv =
- FlinkRowToVLVectorConvertor.fromRowData(
- element.getValue(), allocator, session, inputType);
+ try (RowVector inRv =
+ FlinkRowToVLVectorConvertor.fromRowData(
+ element.getValue(), allocator, session, inputType)) {
inputQueue.put(inRv);
UpIterator.State state = task.advance();
if (state == UpIterator.State.AVAILABLE) {
final StatefulElement statefulElement = task.statefulGet();
- outRv = statefulElement.asRecord().getRowVector();
- List<RowData> rows =
- FlinkRowToVLVectorConvertor.toRowData(
- outRv, allocator, outputTypes.values().iterator().next());
- for (RowData row : rows) {
- output.collect(outElement.replace(row));
+
+ try (RowVector outRv = statefulElement.asRecord().getRowVector()) {
+ List<RowData> rows =
+ FlinkRowToVLVectorConvertor.toRowData(
+ outRv, allocator, outputTypes.values().iterator().next());
+ for (RowData row : rows) {
+ output.collect(outElement.replace(row));
+ }
}
- outRv.close();
- }
- } finally {
- /// The RowVector should be closed in `finally`, to avoid it may not be
closed when exceptions
- // rasied,
- /// that lead to memory leak.
- if (outRv != null) {
- outRv.close();
- }
- if (inRv != null) {
- inRv.close();
}
}
}
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
index 0fc8fca481..8f0371d70e 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
@@ -21,13 +21,11 @@ import
org.apache.gluten.table.runtime.stream.common.GlutenStreamingTestBase;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
-@Disabled("Gluten has not supported part of job run in native")
class ScalarFunctionsTest extends GlutenStreamingTestBase {
@Override
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
index f9cbe2968a..dbe5c6a688 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
@@ -33,7 +33,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-@Disabled("Gluten has not support part of job in native")
class ScanTest extends GlutenStreamingTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ScanTest.class);
@@ -54,6 +53,7 @@ class ScanTest extends GlutenStreamingTestBase {
}
@Test
+ @Disabled("The output is not as expected.")
void testStructScan() {
List<Row> rows =
Arrays.asList(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]