This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6d844c2519 [flink] Set IOManager to TableRead in Flink source (#5443)
6d844c2519 is described below
commit 6d844c25195e0f5e4e11f224305bcb217c786059
Author: tsreaper <[email protected]>
AuthorDate: Thu Apr 10 23:05:24 2025 +0800
[flink] Set IOManager to TableRead in Flink source (#5443)
---
.../paimon/table/source/KeyValueTableRead.java | 6 ++++++
.../paimon/flink/source/FileStoreSourceReader.java | 4 +++-
.../flink/source/FileStoreSourceReaderTest.java | 24 +++++++++++++++++++++-
.../flink/source/TestChangelogDataReadWrite.java | 3 +--
.../source/align/AlignedSourceReaderTest.java | 7 ++++---
5 files changed, 37 insertions(+), 7 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index c62f2118df..7bfbe5fd3e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.MergeFileSplitRead;
@@ -142,4 +143,9 @@ public final class KeyValueTableRead extends
AbstractDataTableRead<KeyValue> {
}
};
}
+
+ @VisibleForTesting
+ public IOManager ioManager() {
+ return ioManager;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 937d54c9f7..622b186780 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -55,7 +55,9 @@ public class FileStoreSourceReader
super(
() ->
new FileStoreSourceSplitReader(
- tableRead, RecordLimiter.create(limit),
metrics),
+ tableRead.withIOManager(ioManager),
+ RecordLimiter.create(limit),
+ metrics),
(element, output, state) ->
FlinkRecordsWithSplitIds.emitRecord(
readerContext, element, output, state,
metrics, rowData),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index 608daa8597..f231b105fa 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -24,6 +24,8 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.source.KeyValueTableRead;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
@@ -135,10 +137,30 @@ public class FileStoreSourceReaderTest {
.matches(event -> event.lastConsumeSnapshotId() == 1L);
}
+ @Test
+ public void testIOManagerIsSet() throws Exception {
+ TestingReaderContext context = new TestingReaderContext();
+ KeyValueTableRead tableRead =
+ new
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey();
+
+ FileStoreSourceReader reader = createReader(context, tableRead);
+
reader.addSplits(Collections.singletonList(createTestFileSplit("id1")));
+ reader.start();
+ reader.close();
+
+ assertThat(tableRead.ioManager()).isNotNull();
+ }
+
protected FileStoreSourceReader createReader(TestingReaderContext context)
{
+ return createReader(
+ context, new
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey());
+ }
+
+ protected FileStoreSourceReader createReader(
+ TestingReaderContext context, TableRead tableRead) {
return new FileStoreSourceReader(
context,
- new
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
+ tableRead,
new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
IOManager.create(tempDir.toString()),
null,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 5e41cb6c65..0d44e0241a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -41,7 +41,6 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.KeyValueTableRead;
-import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
@@ -117,7 +116,7 @@ public class TestChangelogDataReadWrite {
this.commitUser = UUID.randomUUID().toString();
}
- public TableRead createReadWithKey() {
+ public KeyValueTableRead createReadWithKey() {
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
CoreOptions options = new CoreOptions(new HashMap<>());
TableSchema schema = schemaManager.schema(0);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index d0a4e497dc..f243a6a095 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.flink.source.FileStoreSourceReader;
import org.apache.paimon.flink.source.FileStoreSourceReaderTest;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
-import org.apache.paimon.flink.source.TestChangelogDataReadWrite;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -37,6 +36,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
@@ -220,10 +220,11 @@ public class AlignedSourceReaderTest extends
FileStoreSourceReaderTest {
}
@Override
- protected FileStoreSourceReader createReader(TestingReaderContext context)
{
+ protected FileStoreSourceReader createReader(
+ TestingReaderContext context, TableRead tableRead) {
return new AlignedSourceReader(
context,
- new
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
+ tableRead,
new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
IOManager.create(tempDir.toString()),
null,