This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7da2d1a455 [GLUTEN-10131][FLINK] Fix duplicate release of RowVector 
issue in `GlutenSingleInputOperator` (#10132)
7da2d1a455 is described below

commit 7da2d1a4557b34e66c55eec032729498bc112a9d
Author: yuanhang ma <[email protected]>
AuthorDate: Mon Jul 7 15:44:43 2025 +0800

    [GLUTEN-10131][FLINK] Fix duplicate release of RowVector issue in 
`GlutenSingleInputOperator` (#10132)
    
    * Fix duplicate release of RowVector issue in `GlutenSingleInputOperator`
---
 .../operators/GlutenSingleInputOperator.java       | 34 +++++++---------------
 .../runtime/stream/custom/ScalarFunctionsTest.java |  2 --
 .../table/runtime/stream/custom/ScanTest.java      |  2 +-
 3 files changed, 12 insertions(+), 26 deletions(-)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
index 53998f9837..0bec1bd886 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSingleInputOperator.java
@@ -110,34 +110,22 @@ public class GlutenSingleInputOperator extends 
TableStreamOperator<RowData>
 
   @Override
   public void processElement(StreamRecord<RowData> element) {
-    RowVector inRv = null;
-    RowVector outRv = null;
-    try {
-      inRv =
-          FlinkRowToVLVectorConvertor.fromRowData(
-              element.getValue(), allocator, session, inputType);
+    try (RowVector inRv =
+        FlinkRowToVLVectorConvertor.fromRowData(
+            element.getValue(), allocator, session, inputType)) {
       inputQueue.put(inRv);
       UpIterator.State state = task.advance();
       if (state == UpIterator.State.AVAILABLE) {
         final StatefulElement statefulElement = task.statefulGet();
-        outRv = statefulElement.asRecord().getRowVector();
-        List<RowData> rows =
-            FlinkRowToVLVectorConvertor.toRowData(
-                outRv, allocator, outputTypes.values().iterator().next());
-        for (RowData row : rows) {
-          output.collect(outElement.replace(row));
+
+        try (RowVector outRv = statefulElement.asRecord().getRowVector()) {
+          List<RowData> rows =
+              FlinkRowToVLVectorConvertor.toRowData(
+                  outRv, allocator, outputTypes.values().iterator().next());
+          for (RowData row : rows) {
+            output.collect(outElement.replace(row));
+          }
         }
-        outRv.close();
-      }
-    } finally {
-      /// The RowVector should be closed in `finally`, to avoid it may not be 
closed when exceptions
-      // rasied,
-      /// that lead to memory leak.
-      if (outRv != null) {
-        outRv.close();
-      }
-      if (inRv != null) {
-        inRv.close();
       }
     }
   }
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
index 0fc8fca481..8f0371d70e 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
@@ -21,13 +21,11 @@ import 
org.apache.gluten.table.runtime.stream.common.GlutenStreamingTestBase;
 import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
-@Disabled("Gluten has not supported part of job run in native")
 class ScalarFunctionsTest extends GlutenStreamingTestBase {
 
   @Override
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
index f9cbe2968a..dbe5c6a688 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
@@ -33,7 +33,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-@Disabled("Gluten has not support part of job in native")
 class ScanTest extends GlutenStreamingTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(ScanTest.class);
 
@@ -54,6 +53,7 @@ class ScanTest extends GlutenStreamingTestBase {
   }
 
   @Test
+  @Disabled("The output is not as expected.")
   void testStructScan() {
     List<Row> rows =
         Arrays.asList(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to