This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f5644007bb [flink] Replace per record in ReadOperator to work with 
object reuse
f5644007bb is described below

commit f5644007bbbf1c05b26f214fbc1ce98a3fc0ff74
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jan 24 15:09:53 2025 +0800

    [flink] Replace per record in ReadOperator to work with object reuse
---
 .../apache/paimon/flink/source/operator/ReadOperator.java   | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index 1757a859df..c7189f811d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -91,11 +91,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                                 .getSpillingDirectoriesPaths());
         this.read = readBuilder.newRead().withIOManager(ioManager);
         this.reuseRow = new FlinkRowData(null);
-        if (nestedProjectedRowData != null) {
-            this.reuseRecord = new StreamRecord<>(nestedProjectedRowData);
-        } else {
-            this.reuseRecord = new StreamRecord<>(reuseRow);
-        }
+        this.reuseRecord = new StreamRecord<>(null);
         this.idlingStarted();
     }
 
@@ -126,8 +122,11 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                 }
 
                 reuseRow.replace(iterator.next());
-                if (nestedProjectedRowData != null) {
-                    nestedProjectedRowData.replaceRow(this.reuseRow);
+                if (nestedProjectedRowData == null) {
+                    reuseRecord.replace(reuseRow);
+                } else {
+                    nestedProjectedRowData.replaceRow(reuseRow);
+                    reuseRecord.replace(nestedProjectedRowData);
                 }
                 output.collect(reuseRecord);
             }

Reply via email to