This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6e3a4213ca [flink] Fix file not found when using changelog compact
with multiple extra paths (#6210)
6e3a4213ca is described below
commit 6e3a4213ca402f9720eb19dbf6dea4af16f09164
Author: tsreaper <[email protected]>
AuthorDate: Mon Sep 8 10:33:17 2025 +0800
[flink] Fix file not found when using changelog compact with multiple extra
paths (#6210)
---
.../compact/changelog/ChangelogCompactTask.java | 16 +++-
.../changelog/ChangelogCompactTaskTest.java | 96 ++++++++++++++++++++++
2 files changed, 109 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 86ec97ddde..4f79b467e0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -228,8 +228,14 @@ public class ChangelogCompactTask implements Serializable {
+
CompactedChangelogReadOnlyFormat.getIdentifier(
baseResult.meta.fileFormat()),
baseResult.meta);
+ Path realExternalDir =
+ baseResult.meta.externalPath().map(p -> new
Path(p).getParent()).orElse(null);
if (LOG.isDebugEnabled()) {
- LOG.debug("Rename {} to {}", changelogTempPath, realPath);
+ LOG.debug(
+ "Rename {} to {} with external dir {}",
+ changelogTempPath,
+ realPath,
+ realExternalDir);
}
table.fileIO().rename(changelogTempPath, realPath);
@@ -250,10 +256,14 @@ public class ChangelogCompactTask implements Serializable
{
+ "."
+
CompactedChangelogReadOnlyFormat.getIdentifier(
result.meta.fileFormat());
+ DataFileMeta file = result.meta.rename(name);
+ if (realExternalDir != null) {
+ file = file.newExternalPath(new Path(realExternalDir,
name).toString());
+ }
if (result.isCompactResult) {
- compactChangelog.add(result.meta.rename(name));
+ compactChangelog.add(file);
} else {
- newFilesChangelog.add(result.meta.rename(name));
+ newFilesChangelog.add(file);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
index 5b410d78b4..1bcccf9cbf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
@@ -18,30 +18,44 @@
package org.apache.paimon.flink.compact.changelog;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.FileNotFoundException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link ChangelogCompactTask}. */
@@ -97,4 +111,86 @@ public class ChangelogCompactTaskTest {
.isInstanceOf(FileNotFoundException.class)
.hasMessageContaining("unexisting-file");
}
+
+ @Test
+ public void testManyExternalPaths() throws Exception {
+ String warehouse = tempDir.toString() + "/warehouse";
+ FileSystemCatalog catalog =
+ new FileSystemCatalog(LocalFileIO.create(), new
Path(warehouse));
+ catalog.createDatabase("default", false);
+
+ List<String> externalPaths = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ String path = "file://" + tempDir.toString() + "/external" + i;
+ externalPaths.add(path);
+ }
+
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), "2");
+ options.put(
+ CoreOptions.CHANGELOG_PRODUCER.key(),
+ CoreOptions.ChangelogProducer.INPUT.toString());
+ options.put(
+ CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+ String.valueOf(System.currentTimeMillis()));
+ options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(),
String.join(",", externalPaths));
+ options.put(
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
+ CoreOptions.ExternalPathStrategy.ROUND_ROBIN.toString());
+ catalog.createTable(
+ Identifier.create("default", "T"),
+ new Schema(
+ Arrays.asList(
+ new DataField(0, "k", DataTypes.INT()),
+ new DataField(1, "v", DataTypes.INT())),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ options,
+ ""),
+ false);
+ FileStoreTable table = (FileStoreTable)
catalog.getTable(Identifier.create("default", "T"));
+ int numRecords = 10;
+
+ Map<Integer, List<DataFileMeta>> files = new HashMap<>();
+ TableWriteImpl<?> write = table.newWrite("test");
+ for (int i = 0; i < numRecords; i++) {
+ write.write(GenericRow.of(i, i * 10));
+ for (CommitMessage message : write.prepareCommit(false, 1)) {
+ CommitMessageImpl casted = (CommitMessageImpl) message;
+ files.computeIfAbsent(message.bucket(), k -> new ArrayList<>())
+ .addAll(casted.newFilesIncrement().changelogFiles());
+ }
+ }
+ write.close();
+
+ ChangelogCompactTask task =
+ new ChangelogCompactTask(1, BinaryRow.EMPTY_ROW, 2, files, new
HashMap<>());
+
+ List<CommitMessage> messages =
+ task.doCompact(table, Executors.newFixedThreadPool(1),
MemorySize.ofMebiBytes(64))
+ .stream()
+ .map(c -> (CommitMessageImpl) c.wrappedCommittable())
+ .collect(Collectors.toList());
+ TableCommitImpl commit = table.newCommit("test");
+ commit.commit(messages);
+ commit.close();
+
+ StreamDataTableScan scan = table.newStreamScan();
+ List<String> actual = new ArrayList<>();
+ while (actual.size() < numRecords) {
+ TableScan.Plan plan = scan.plan();
+ CloseableIterator<InternalRow> it =
+ new
RecordReaderIterator<>(table.newRead().createReader(plan));
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ actual.add(String.format("(%d, %d)", row.getInt(0),
row.getInt(1)));
+ }
+ }
+
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ expected.add(String.format("(%d, %d)", i, i * 10));
+ }
+ assertThat(actual).hasSameElementsAs(expected);
+ }
}