This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d844c2519 [flink] Set IOManager to TableRead in Flink source (#5443)
6d844c2519 is described below

commit 6d844c25195e0f5e4e11f224305bcb217c786059
Author: tsreaper <[email protected]>
AuthorDate: Thu Apr 10 23:05:24 2025 +0800

    [flink] Set IOManager to TableRead in Flink source (#5443)
---
 .../paimon/table/source/KeyValueTableRead.java     |  6 ++++++
 .../paimon/flink/source/FileStoreSourceReader.java |  4 +++-
 .../flink/source/FileStoreSourceReaderTest.java    | 24 +++++++++++++++++++++-
 .../flink/source/TestChangelogDataReadWrite.java   |  3 +--
 .../source/align/AlignedSourceReaderTest.java      |  7 ++++---
 5 files changed, 37 insertions(+), 7 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index c62f2118df..7bfbe5fd3e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.MergeFileSplitRead;
@@ -142,4 +143,9 @@ public final class KeyValueTableRead extends 
AbstractDataTableRead<KeyValue> {
             }
         };
     }
+
+    @VisibleForTesting
+    public IOManager ioManager() {
+        return ioManager;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 937d54c9f7..622b186780 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -55,7 +55,9 @@ public class FileStoreSourceReader
         super(
                 () ->
                         new FileStoreSourceSplitReader(
-                                tableRead, RecordLimiter.create(limit), 
metrics),
+                                tableRead.withIOManager(ioManager),
+                                RecordLimiter.create(limit),
+                                metrics),
                 (element, output, state) ->
                         FlinkRecordsWithSplitIds.emitRecord(
                                 readerContext, element, output, state, 
metrics, rowData),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index 608daa8597..f231b105fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -24,6 +24,8 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.source.KeyValueTableRead;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.IntType;
@@ -135,10 +137,30 @@ public class FileStoreSourceReaderTest {
                 .matches(event -> event.lastConsumeSnapshotId() == 1L);
     }
 
+    @Test
+    public void testIOManagerIsSet() throws Exception {
+        TestingReaderContext context = new TestingReaderContext();
+        KeyValueTableRead tableRead =
+                new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey();
+
+        FileStoreSourceReader reader = createReader(context, tableRead);
+        
reader.addSplits(Collections.singletonList(createTestFileSplit("id1")));
+        reader.start();
+        reader.close();
+
+        assertThat(tableRead.ioManager()).isNotNull();
+    }
+
     protected FileStoreSourceReader createReader(TestingReaderContext context) 
{
+        return createReader(
+                context, new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey());
+    }
+
+    protected FileStoreSourceReader createReader(
+            TestingReaderContext context, TableRead tableRead) {
         return new FileStoreSourceReader(
                 context,
-                new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
+                tableRead,
                 new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
                 IOManager.create(tempDir.toString()),
                 null,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 5e41cb6c65..0d44e0241a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -41,7 +41,6 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.KeyValueTableRead;
-import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.IntType;
@@ -117,7 +116,7 @@ public class TestChangelogDataReadWrite {
         this.commitUser = UUID.randomUUID().toString();
     }
 
-    public TableRead createReadWithKey() {
+    public KeyValueTableRead createReadWithKey() {
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
         CoreOptions options = new CoreOptions(new HashMap<>());
         TableSchema schema = schemaManager.schema(0);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index d0a4e497dc..f243a6a095 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.flink.source.FileStoreSourceReader;
 import org.apache.paimon.flink.source.FileStoreSourceReaderTest;
 import org.apache.paimon.flink.source.FileStoreSourceSplit;
 import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
-import org.apache.paimon.flink.source.TestChangelogDataReadWrite;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -37,6 +36,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.TableRead;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
@@ -220,10 +220,11 @@ public class AlignedSourceReaderTest extends 
FileStoreSourceReaderTest {
     }
 
     @Override
-    protected FileStoreSourceReader createReader(TestingReaderContext context) 
{
+    protected FileStoreSourceReader createReader(
+            TestingReaderContext context, TableRead tableRead) {
         return new AlignedSourceReader(
                 context,
-                new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
+                tableRead,
                 new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
                 IOManager.create(tempDir.toString()),
                 null,

Reply via email to