This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e3a4213ca [flink] Fix file not found when using changelog compact 
with multiple extra paths (#6210)
6e3a4213ca is described below

commit 6e3a4213ca402f9720eb19dbf6dea4af16f09164
Author: tsreaper <[email protected]>
AuthorDate: Mon Sep 8 10:33:17 2025 +0800

    [flink] Fix file not found when using changelog compact with multiple extra 
paths (#6210)
---
 .../compact/changelog/ChangelogCompactTask.java    | 16 +++-
 .../changelog/ChangelogCompactTaskTest.java        | 96 ++++++++++++++++++++++
 2 files changed, 109 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 86ec97ddde..4f79b467e0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -228,8 +228,14 @@ public class ChangelogCompactTask implements Serializable {
                                 + 
CompactedChangelogReadOnlyFormat.getIdentifier(
                                         baseResult.meta.fileFormat()),
                         baseResult.meta);
+        Path realExternalDir =
+                baseResult.meta.externalPath().map(p -> new 
Path(p).getParent()).orElse(null);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Rename {} to {}", changelogTempPath, realPath);
+            LOG.debug(
+                    "Rename {} to {} with external dir {}",
+                    changelogTempPath,
+                    realPath,
+                    realExternalDir);
         }
         table.fileIO().rename(changelogTempPath, realPath);
 
@@ -250,10 +256,14 @@ public class ChangelogCompactTask implements Serializable 
{
                                 + "."
                                 + 
CompactedChangelogReadOnlyFormat.getIdentifier(
                                         result.meta.fileFormat());
+                DataFileMeta file = result.meta.rename(name);
+                if (realExternalDir != null) {
+                    file = file.newExternalPath(new Path(realExternalDir, 
name).toString());
+                }
                 if (result.isCompactResult) {
-                    compactChangelog.add(result.meta.rename(name));
+                    compactChangelog.add(file);
                 } else {
-                    newFilesChangelog.add(result.meta.rename(name));
+                    newFilesChangelog.add(file);
                 }
             }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
index 5b410d78b4..1bcccf9cbf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
@@ -18,30 +18,44 @@
 
 package org.apache.paimon.flink.compact.changelog;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.FileSystemCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.CloseableIterator;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.FileNotFoundException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link ChangelogCompactTask}. */
@@ -97,4 +111,86 @@ public class ChangelogCompactTaskTest {
                 .isInstanceOf(FileNotFoundException.class)
                 .hasMessageContaining("unexisting-file");
     }
+
+    @Test
+    public void testManyExternalPaths() throws Exception {
+        String warehouse = tempDir.toString() + "/warehouse";
+        FileSystemCatalog catalog =
+                new FileSystemCatalog(LocalFileIO.create(), new 
Path(warehouse));
+        catalog.createDatabase("default", false);
+
+        List<String> externalPaths = new ArrayList<>();
+        for (int i = 0; i < 4; i++) {
+            String path = "file://" + tempDir.toString() + "/external" + i;
+            externalPaths.add(path);
+        }
+
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.BUCKET.key(), "2");
+        options.put(
+                CoreOptions.CHANGELOG_PRODUCER.key(),
+                CoreOptions.ChangelogProducer.INPUT.toString());
+        options.put(
+                CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
+                String.valueOf(System.currentTimeMillis()));
+        options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), 
String.join(",", externalPaths));
+        options.put(
+                CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(),
+                CoreOptions.ExternalPathStrategy.ROUND_ROBIN.toString());
+        catalog.createTable(
+                Identifier.create("default", "T"),
+                new Schema(
+                        Arrays.asList(
+                                new DataField(0, "k", DataTypes.INT()),
+                                new DataField(1, "v", DataTypes.INT())),
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        options,
+                        ""),
+                false);
+        FileStoreTable table = (FileStoreTable) 
catalog.getTable(Identifier.create("default", "T"));
+        int numRecords = 10;
+
+        Map<Integer, List<DataFileMeta>> files = new HashMap<>();
+        TableWriteImpl<?> write = table.newWrite("test");
+        for (int i = 0; i < numRecords; i++) {
+            write.write(GenericRow.of(i, i * 10));
+            for (CommitMessage message : write.prepareCommit(false, 1)) {
+                CommitMessageImpl casted = (CommitMessageImpl) message;
+                files.computeIfAbsent(message.bucket(), k -> new ArrayList<>())
+                        .addAll(casted.newFilesIncrement().changelogFiles());
+            }
+        }
+        write.close();
+
+        ChangelogCompactTask task =
+                new ChangelogCompactTask(1, BinaryRow.EMPTY_ROW, 2, files, new 
HashMap<>());
+
+        List<CommitMessage> messages =
+                task.doCompact(table, Executors.newFixedThreadPool(1), 
MemorySize.ofMebiBytes(64))
+                        .stream()
+                        .map(c -> (CommitMessageImpl) c.wrappedCommittable())
+                        .collect(Collectors.toList());
+        TableCommitImpl commit = table.newCommit("test");
+        commit.commit(messages);
+        commit.close();
+
+        StreamDataTableScan scan = table.newStreamScan();
+        List<String> actual = new ArrayList<>();
+        while (actual.size() < numRecords) {
+            TableScan.Plan plan = scan.plan();
+            CloseableIterator<InternalRow> it =
+                    new 
RecordReaderIterator<>(table.newRead().createReader(plan));
+            while (it.hasNext()) {
+                InternalRow row = it.next();
+                actual.add(String.format("(%d, %d)", row.getInt(0), 
row.getInt(1)));
+            }
+        }
+
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            expected.add(String.format("(%d, %d)", i, i * 10));
+        }
+        assertThat(actual).hasSameElementsAs(expected);
+    }
 }

Reply via email to