This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 66981715e6 [core] AbstractFileStoreWrite should not provide
createWriterContainer with ignorePreviousFiles
66981715e6 is described below
commit 66981715e6fccc587cbf26d76fb723bb085465d9
Author: JingsongLi <[email protected]>
AuthorDate: Tue Aug 19 16:53:03 2025 +0800
[core] AbstractFileStoreWrite should not provide createWriterContainer with
ignorePreviousFiles
---
.../paimon/operation/AbstractFileStoreWrite.java | 14 ++++---
.../apache/paimon/table/sink/TableWriteImpl.java | 4 ++
.../test/java/org/apache/paimon/TestFileStore.java | 5 +--
.../paimon/operation/FileStoreTestUtils.java | 3 +-
.../operation/KeyValueFileStoreWriteTest.java | 2 +-
.../apache/paimon/operation/TestCommitThread.java | 4 +-
.../paimon/table/DynamicBucketTableTest.java | 7 ++--
.../flink/source/TestChangelogDataReadWrite.java | 46 +++++++++++-----------
8 files changed, 44 insertions(+), 41 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 0b8797cd08..2e61c3d2f1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -401,16 +401,14 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
writers.put(partition.copy(), buckets);
}
return buckets.computeIfAbsent(
- bucket, k -> createWriterContainer(partition.copy(), bucket,
ignorePreviousFiles));
+ bucket, k -> createWriterContainer(partition.copy(), bucket));
}
- private long writerNumber() {
- return writers.values().stream().mapToLong(Map::size).sum();
+ public RecordWriter<T> createWriter(BinaryRow partition, int bucket) {
+ return createWriterContainer(partition, bucket).writer;
}
- @VisibleForTesting
- public WriterContainer<T> createWriterContainer(
- BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
+ public WriterContainer<T> createWriterContainer(BinaryRow partition, int
bucket) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating writer for partition {}, bucket {}",
partition, bucket);
}
@@ -461,6 +459,10 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
previousSnapshot == null ? null : previousSnapshot.id());
}
+ private long writerNumber() {
+ return writers.values().stream().mapToLong(Map::size).sum();
+ }
+
@Override
public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry)
{
this.compactionMetrics = new CompactionMetrics(metricRegistry,
tableName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 9d3f4e8b51..ac1499c75d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -90,6 +90,10 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
this.defaultValueRow = DefaultValueRow.create(rowType);
}
+ public FileStoreWrite<T> fileStoreWrite() {
+ return write;
+ }
+
@Override
public InnerTableWrite withWriteRestore(WriteRestore writeRestore) {
this.write.withWriteRestore(writeRestore);
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 7d303cad2d..8e4cafc7de 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -321,10 +321,9 @@ public class TestFileStore extends KeyValueFileStore {
bucket,
(b, w) -> {
if (w == null) {
+
write.withIgnorePreviousFiles(ignorePreviousFiles);
RecordWriter<KeyValue> writer =
- write.createWriterContainer(
- partition, bucket,
ignorePreviousFiles)
- .writer;
+
write.createWriterContainer(partition, bucket).writer;
((MemoryOwner) writer)
.setMemoryPool(
new HeapMemorySegmentPool(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
index b874f07237..54fef645cb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -74,8 +74,7 @@ public class FileStoreTestUtils {
TestFileStore store, List<KeyValue> keyValues, BinaryRow
partition, int bucket)
throws Exception {
AbstractFileStoreWrite<KeyValue> write = store.newWrite();
- RecordWriter<KeyValue> writer =
- write.createWriterContainer(partition, bucket, false).writer;
+ RecordWriter<KeyValue> writer = write.createWriterContainer(partition,
bucket).writer;
((MemoryOwner) writer)
.setMemoryPool(
new HeapMemorySegmentPool(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
index baeb6e9b4e..c13fcc1f64 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
@@ -98,7 +98,7 @@ public class KeyValueFileStoreWriteTest {
KeyValue keyValue = gen.next();
AbstractFileStoreWrite.WriterContainer<KeyValue> writerContainer =
- write.createWriterContainer(gen.getPartition(keyValue), 1,
false);
+ write.createWriterContainer(gen.getPartition(keyValue), 1);
MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer;
try (MergeTreeCompactManager compactManager =
(MergeTreeCompactManager) writer.compactManager()) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index e1f9cb3994..2f409a9176 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -289,8 +289,8 @@ public class TestCommitThread extends Thread {
}
private MergeTreeWriter createWriter(BinaryRow partition, boolean empty) {
- MergeTreeWriter writer =
- (MergeTreeWriter) write.createWriterContainer(partition, 0,
empty).writer;
+ write.withIgnorePreviousFiles(empty);
+ MergeTreeWriter writer = (MergeTreeWriter)
write.createWriterContainer(partition, 0).writer;
writer.setMemoryPool(
new HeapMemorySegmentPool(
WRITE_BUFFER_SIZE.getBytes(), (int)
PAGE_SIZE.getBytes()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
index 79f2c34100..29e71e2b44 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -51,11 +51,10 @@ public class DynamicBucketTableTest extends TableTestBase {
Table table = getTableDefault();
BatchWriteBuilderImpl builder = (BatchWriteBuilderImpl)
table.newBatchWriteBuilder();
TableWriteImpl batchTableWrite = (TableWriteImpl)
builder.withOverwrite().newWrite();
+ AbstractFileStoreWrite<?> write = (AbstractFileStoreWrite<?>)
(batchTableWrite.getWrite());
+ write.withIgnorePreviousFiles(true);
DynamicBucketIndexMaintainer indexMaintainer =
- (DynamicBucketIndexMaintainer)
- ((AbstractFileStoreWrite<?>)
(batchTableWrite.getWrite()))
- .createWriterContainer(BinaryRow.EMPTY_ROW, 0,
true)
- .dynamicBucketMaintainer;
+ write.createWriterContainer(BinaryRow.EMPTY_ROW,
0).dynamicBucketMaintainer;
assertThat(indexMaintainer.isEmpty()).isTrue();
Pair<InternalRow, Integer> rowWithBucket = data(0);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 2e9695a525..cb6ef1c40b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -173,30 +173,30 @@ public class TestChangelogDataReadWrite {
new
CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro"));
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
- RecordWriter<KeyValue> writer =
+ KeyValueFileStoreWrite write =
new KeyValueFileStoreWrite(
- LocalFileIO.create(),
- schemaManager,
- schemaManager.schema(0),
- commitUser,
- PARTITION_TYPE,
- KEY_TYPE,
- VALUE_TYPE,
- () -> COMPARATOR,
- () -> null,
- () -> EQUALISER,
- DeduplicateMergeFunction.factory(),
- pathFactory,
- (coreOptions, format) -> pathFactory,
- snapshotManager,
- null, // not used, we only create an empty
writer
- null,
- null,
- options,
- EXTRACTOR,
- tablePath.getName())
- .createWriterContainer(partition, bucket, true)
- .writer;
+ LocalFileIO.create(),
+ schemaManager,
+ schemaManager.schema(0),
+ commitUser,
+ PARTITION_TYPE,
+ KEY_TYPE,
+ VALUE_TYPE,
+ () -> COMPARATOR,
+ () -> null,
+ () -> EQUALISER,
+ DeduplicateMergeFunction.factory(),
+ pathFactory,
+ (coreOptions, format) -> pathFactory,
+ snapshotManager,
+ null, // not used, we only create an empty writer
+ null,
+ null,
+ options,
+ EXTRACTOR,
+ tablePath.getName());
+ write.withIgnorePreviousFiles(true);
+ RecordWriter<KeyValue> writer = write.createWriterContainer(partition,
bucket).writer;
((MemoryOwner) writer)
.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));