This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0dacac84d3 [GLUTEN-10033][FLINK] Fix memory leak caused by unclosed 
RowVector in `GlutenSourceFunction` (#10034)
0dacac84d3 is described below

commit 0dacac84d3bf3d2759a5dd7e0735147852d2845d
Author: kevinyhzou <[email protected]>
AuthorDate: Wed Jul 9 13:10:37 2025 +0800

    [GLUTEN-10033][FLINK] Fix memory leak caused by unclosed RowVector in 
`GlutenSourceFunction` (#10034)
---
 .../table/runtime/operators/GlutenSourceFunction.java      | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index 70b639c02b..ddcd5cae94 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -102,14 +102,14 @@ public class GlutenSourceFunction extends 
RichParallelSourceFunction<RowData> {
       UpIterator.State state = task.advance();
       if (state == UpIterator.State.AVAILABLE) {
         final StatefulElement element = task.statefulGet();
-        final RowVector outRv = element.asRecord().getRowVector();
-        List<RowData> rows =
-            FlinkRowToVLVectorConvertor.toRowData(
-                outRv, allocator, outputTypes.values().iterator().next());
-        for (RowData row : rows) {
-          sourceContext.collect(row);
+        try (final RowVector outRv = element.asRecord().getRowVector()) {
+          List<RowData> rows =
+              FlinkRowToVLVectorConvertor.toRowData(
+                  outRv, allocator, outputTypes.values().iterator().next());
+          for (RowData row : rows) {
+            sourceContext.collect(row);
+          }
         }
-        outRv.close();
       } else if (state == UpIterator.State.BLOCKED) {
         LOG.debug("Get empty row");
       } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to