This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 92e2f5576a [core] Fix checkpoint recovery failure for compacted 
changelog files (#6173)
92e2f5576a is described below

commit 92e2f5576abca9130299ac1353e0b553a3857c04
Author: Sean Xia <[email protected]>
AuthorDate: Thu Sep 11 15:36:40 2025 +0800

    [core] Fix checkpoint recovery failure for compacted changelog files (#6173)
---
 .../apache/paimon/table/sink/TableCommitImpl.java  |  12 +-
 .../utils/CompactedChangelogPathResolver.java      | 130 +++++++++++++++
 .../apache/paimon/table/sink/TableCommitTest.java  | 178 +++++++++++++++++++++
 .../utils/CompactedChangelogPathResolverTest.java  | 115 +++++++++++++
 .../CompactedChangelogFormatReaderFactory.java     |  73 ++-------
 5 files changed, 448 insertions(+), 60 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index c1a69a2391..95b2df34f5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -33,6 +33,7 @@ import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.tag.TagAutoCreation;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.tag.TagTimeExpire;
+import org.apache.paimon.utils.CompactedChangelogPathResolver;
 import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.ExecutorThreadFactory;
 import org.apache.paimon.utils.IndexFilePathFactories;
@@ -300,6 +301,15 @@ public class TableCommitImpl implements InnerTableCommit {
             }
         }
 
+        // Resolve compacted changelog files to their real file paths
+        List<Path> resolvedFiles = new ArrayList<>();
+        for (Path file : files) {
+            
resolvedFiles.add(CompactedChangelogPathResolver.resolveCompactedChangelogPath(file));
+        }
+        // Deduplicate paths as multiple compacted changelog references may 
resolve to the same
+        // physical file
+        resolvedFiles = 
resolvedFiles.stream().distinct().collect(Collectors.toList());
+
         Predicate<Path> nonExists =
                 p -> {
                     try {
@@ -314,7 +324,7 @@ public class TableCommitImpl implements InnerTableCommit {
                         randomlyExecuteSequentialReturn(
                                 getExecutorService(null),
                                 f -> nonExists.test(f) ? singletonList(f) : 
emptyList(),
-                                files));
+                                resolvedFiles));
 
         if (!nonExistFiles.isEmpty()) {
             String message =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java
new file mode 100644
index 0000000000..5263497ecc
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.Path;
+
+/**
+ * Utility class for resolving compacted changelog file paths.
+ *
+ * <p>This class provides functionality to resolve fake compacted changelog 
file paths to their real
+ * file paths.
+ *
+ * <p><b>File Name Protocol</b>
+ *
+ * <p>There are two kinds of file name. In the following description, 
<code>bid1</code> and <code>
+ * bid2</code> are bucket id, <code>off</code> is offset, <code>len1</code> 
and <code>len2</code>
+ * are lengths.
+ *
+ * <ul>
+ *   <li><code>bucket-bid1/compacted-changelog-xxx$bid1-len1</code>: This is 
the real file name. If
+ *       this file name is recorded in manifest file meta, reader should read 
the bytes of this file
+ *       starting from offset <code>0</code> with length <code>len1</code>.
+ *   <li><code>bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2</code>: 
This is the fake file
+ *       name. Reader should read the bytes of file <code>
+ *       bucket-bid1/compacted-changelog-xxx$bid1-len1</code> starting from 
offset <code>off</code>
+ *       with length <code>len2</code>.
+ * </ul>
+ */
+public class CompactedChangelogPathResolver {
+
+    /**
+     * Checks if the given path is a compacted changelog file path.
+     *
+     * @param path the file path to check
+     * @return true if the path is a compacted changelog file, false otherwise
+     */
+    public static boolean isCompactedChangelogPath(Path path) {
+        return path.getName().startsWith("compacted-changelog-");
+    }
+
+    /**
+     * Resolves a file path, handling compacted changelog file path resolution 
if applicable.
+     *
+     * <p>For compacted changelog files, resolves fake file paths to their 
real file paths as
+     * described in the protocol above. For non-compacted changelog files, 
returns the path
+     * unchanged.
+     *
+     * @param path the file path to resolve
+     * @return the resolved real file path for compacted changelog files, or 
the original path
+     *     unchanged for other files
+     */
+    public static Path resolveCompactedChangelogPath(Path path) {
+        if (!isCompactedChangelogPath(path)) {
+            return path;
+        }
+        return decodePath(path).getPath();
+    }
+
+    /**
+     * Decodes a compacted changelog file path to extract the real path, 
offset, and length.
+     *
+     * @param path the file path to decode
+     * @return the decode result containing real path, offset, and length
+     */
+    public static DecodeResult decodePath(Path path) {
+        String[] nameAndFormat = path.getName().split("\\.");
+        String[] names = nameAndFormat[0].split("\\$");
+        String[] split = names[1].split("-");
+        if (split.length == 2) {
+            return new DecodeResult(path, 0, Long.parseLong(split[1]));
+        } else {
+            Path realPath =
+                    new Path(
+                            path.getParent().getParent(),
+                            "bucket-"
+                                    + split[0]
+                                    + "/"
+                                    + names[0]
+                                    + "$"
+                                    + split[0]
+                                    + "-"
+                                    + split[1]
+                                    + "."
+                                    + nameAndFormat[1]);
+            return new DecodeResult(realPath, Long.parseLong(split[2]), 
Long.parseLong(split[3]));
+        }
+    }
+
+    /** Result of decoding a compacted changelog file path. */
+    public static class DecodeResult {
+
+        private final Path path;
+        private final long offset;
+        private final long length;
+
+        public DecodeResult(Path path, long offset, long length) {
+            this.path = path;
+            this.offset = offset;
+            this.length = length;
+        }
+
+        public Path getPath() {
+            return path;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public long getLength() {
+            return length;
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index f8cb17bd91..0a39ca7ef8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -24,7 +24,10 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -34,6 +37,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
@@ -438,4 +442,178 @@ public class TableCommitTest {
         assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(5);
         assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
     }
+
+    @Test
+    public void testRecoverCompactedChangelogFiles() throws Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+                        new String[] {"k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 3);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                options.toMap(),
+                                ""));
+
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+
+        // Create fake compacted changelog files that should resolve to real 
files
+        String realChangelogFile =
+                
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet";
+        String fakeChangelogFile1 =
+                
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet";
+        String fakeChangelogFile2 =
+                
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-74952-37725.cc-parquet";
+
+        // Create directory structure
+        Path bucket0Dir = new Path(path, "bucket-0");
+        Path bucket1Dir = new Path(path, "bucket-1");
+        Path bucket2Dir = new Path(path, "bucket-2");
+        LocalFileIO.create().mkdirs(bucket0Dir);
+        LocalFileIO.create().mkdirs(bucket1Dir);
+        LocalFileIO.create().mkdirs(bucket2Dir);
+
+        // Create the real compacted changelog file
+        Path realFilePath = new Path(bucket0Dir, realChangelogFile);
+        LocalFileIO.create().newOutputStream(realFilePath, false).close();
+
+        DataFileMeta realFileMeta =
+                DataFileMeta.forAppend(
+                        realChangelogFile,
+                        3000L,
+                        300L,
+                        SimpleStats.EMPTY_STATS,
+                        0L,
+                        0L,
+                        1L,
+                        Collections.emptyList(),
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        // Create fake DataFileMeta for compacted changelog files
+        DataFileMeta fakeFileMeta1 =
+                DataFileMeta.forAppend(
+                        fakeChangelogFile1,
+                        1000L,
+                        100L,
+                        SimpleStats.EMPTY_STATS,
+                        0L,
+                        0L,
+                        1L,
+                        Collections.emptyList(),
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        DataFileMeta fakeFileMeta2 =
+                DataFileMeta.forAppend(
+                        fakeChangelogFile2,
+                        2000L,
+                        200L,
+                        SimpleStats.EMPTY_STATS,
+                        0L,
+                        0L,
+                        1L,
+                        Collections.emptyList(),
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        // Create commit message with fake compacted changelog files
+        BinaryRow partition = BinaryRow.EMPTY_ROW;
+        CommitMessageImpl commitMessage0 =
+                new CommitMessageImpl(
+                        partition,
+                        0,
+                        3,
+                        new DataIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.singletonList(realFileMeta)),
+                        new CompactIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList()));
+        CommitMessageImpl commitMessage1 =
+                new CommitMessageImpl(
+                        partition,
+                        1,
+                        3,
+                        new DataIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.singletonList(fakeFileMeta1)),
+                        new CompactIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList()));
+        CommitMessageImpl commitMessage2 =
+                new CommitMessageImpl(
+                        partition,
+                        2,
+                        3,
+                        new DataIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.singletonList(fakeFileMeta2)),
+                        new CompactIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList()));
+
+        ManifestCommittable committable = new ManifestCommittable(1L);
+        committable.addFileCommittable(commitMessage0);
+        committable.addFileCommittable(commitMessage1);
+        committable.addFileCommittable(commitMessage2);
+
+        String commitUser = UUID.randomUUID().toString();
+        try (TableCommitImpl commit = table.newCommit(commitUser)) {
+            // This should succeed because fake files resolve to the existing 
real file
+            
commit.filterAndCommitMultiple(Collections.singletonList(committable), false);
+        }
+
+        // Now delete the real file and test that the check fails
+        LocalFileIO.create().delete(realFilePath, false);
+
+        // Create a new committable with a larger identifier to simulate 
recovery from checkpoint
+        // This identifier must be larger than the previously committed 
identifier (1L)
+        ManifestCommittable newCommittable = new ManifestCommittable(2L);
+        newCommittable.addFileCommittable(commitMessage0);
+        newCommittable.addFileCommittable(commitMessage1);
+        newCommittable.addFileCommittable(commitMessage2);
+
+        try (TableCommitImpl commit = table.newCommit(commitUser)) {
+            assertThatThrownBy(
+                            () ->
+                                    commit.filterAndCommitMultiple(
+                                            
Collections.singletonList(newCommittable), false))
+                    .hasMessageContaining(
+                            "Cannot recover from this checkpoint because some 
files in the"
+                                    + " snapshot that need to be resubmitted 
have been deleted");
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java
new file mode 100644
index 0000000000..f2b7941096
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.Path;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CompactedChangelogPathResolver}. */
+public class CompactedChangelogPathResolverTest {
+
+    @Test
+    public void testIsCompactedChangelogPath() {
+        // Test non-compacted changelog file
+        Path regularFile =
+                new Path(
+                        
"/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet");
+        
assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(regularFile)).isFalse();
+
+        // Test compacted changelog file
+        Path compactedFile =
+                new Path(
+                        
"/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet");
+        
assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(compactedFile)).isTrue();
+
+        // Test regular data file
+        Path dataFile = new 
Path("/path/to/table/bucket-0/data-file-1.parquet");
+        
assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(dataFile)).isFalse();
+    }
+
+    @Test
+    public void testResolveNonCompactedChangelogFile() {
+        // Test regular changelog file - should return unchanged
+        Path regularFile =
+                new Path(
+                        
"/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet");
+        Path resolved = 
CompactedChangelogPathResolver.resolveCompactedChangelogPath(regularFile);
+        assertThat(resolved).isEqualTo(regularFile);
+    }
+
+    @Test
+    public void testResolveRealCompactedChangelogFile() {
+        // Test real compacted changelog file - should return unchanged
+        Path realFile =
+                new Path(
+                        
"/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet");
+        Path resolved = 
CompactedChangelogPathResolver.resolveCompactedChangelogPath(realFile);
+        assertThat(resolved).isEqualTo(realFile);
+    }
+
+    @Test
+    public void testResolveFakeCompactedChangelogFile() {
+        // Test fake compacted changelog file - should resolve to real path
+        Path fakeFile =
+                new Path(
+                        
"/path/to/table/bucket-1/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet");
+        Path resolved = 
CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeFile);
+
+        Path expectedRealFile =
+                new Path(
+                        
"/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet");
+        assertThat(resolved).isEqualTo(expectedRealFile);
+    }
+
+    @Test
+    public void testResolveWithDifferentFormats() {
+        // Test with different file formats
+        Path fakeOrcFile =
+                new Path(
+                        
"/path/to/table/bucket-2/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-1024-1024-512.cc-orc");
+        Path resolvedOrc =
+                
CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeOrcFile);
+        Path expectedOrcFile =
+                new Path(
+                        
"/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-1024.cc-orc");
+        assertThat(resolvedOrc).isEqualTo(expectedOrcFile);
+
+        Path fakeAvroFile =
+                new Path(
+                        
"/path/to/table/bucket-5/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$2-2048-2048-1024.cc-avro");
+        Path resolvedAvro =
+                
CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeAvroFile);
+        Path expectedAvroFile =
+                new Path(
+                        
"/path/to/table/bucket-2/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$2-2048.cc-avro");
+        assertThat(resolvedAvro).isEqualTo(expectedAvroFile);
+    }
+
+    @Test
+    public void testResolveFileWithoutExtension() {
+        // Test file without file extension - should return unchanged
+        Path fileWithoutExt = new Path("/path/to/table/file");
+        Path resolved =
+                
CompactedChangelogPathResolver.resolveCompactedChangelogPath(fileWithoutExt);
+        assertThat(resolved).isEqualTo(fileWithoutExt);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
index cd4a1d8c3d..70d2555cf3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -27,6 +27,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.utils.CompactedChangelogPathResolver;
 import org.apache.paimon.utils.RoaringBitmap32;
 
 import java.io.EOFException;
@@ -35,21 +36,8 @@ import java.io.IOException;
 /**
  * {@link FormatReaderFactory} for compacted changelog.
  *
- * <p><b>File Name Protocol</b>
- *
- * <p>There are two kinds of file name. In the following description, 
<code>bid1</code> and <code>
- * bid2</code> are bucket id, <code>off</code> is offset, <code>len1</code> 
and <code>len2</code>
- * are lengths.
- *
- * <ul>
- *   <li><code>bucket-bid1/compacted-changelog-xxx$bid1-len1</code>: This is 
the real file name. If
- *       this file name is recorded in manifest file meta, reader should read 
the bytes of this file
- *       starting from offset <code>0</code> with length <code>len1</code>.
- *   <li><code>bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2</code>: 
This is the fake file
- *       name. Reader should read the bytes of file <code>
- *       bucket-bid1/compacted-changelog-xxx$bid1-len1</code> starting from 
offset <code>off</code>
- *       with length <code>len2</code>.
- * </ul>
+ * <p>Uses {@link org.apache.paimon.utils.CompactedChangelogPathResolver} for 
file name protocol
+ * handling.
  */
 public class CompactedChangelogFormatReaderFactory implements 
FormatReaderFactory {
 
@@ -62,7 +50,7 @@ public class CompactedChangelogFormatReaderFactory implements 
FormatReaderFactor
     @Override
     public FileRecordReader<InternalRow> createReader(Context context) throws 
IOException {
         OffsetReadOnlyFileIO fileIO = new 
OffsetReadOnlyFileIO(context.fileIO());
-        long length = decodePath(context.filePath()).length;
+        long length = 
CompactedChangelogPathResolver.decodePath(context.filePath()).getLength();
 
         return wrapped.createReader(
                 new Context() {
@@ -89,43 +77,6 @@ public class CompactedChangelogFormatReaderFactory 
implements FormatReaderFactor
                 });
     }
 
-    private static DecodeResult decodePath(Path path) {
-        String[] nameAndFormat = path.getName().split("\\.");
-        String[] names = nameAndFormat[0].split("\\$");
-        String[] split = names[1].split("-");
-        if (split.length == 2) {
-            return new DecodeResult(path, 0, Long.parseLong(split[1]));
-        } else {
-            Path realPath =
-                    new Path(
-                            path.getParent().getParent(),
-                            "bucket-"
-                                    + split[0]
-                                    + "/"
-                                    + names[0]
-                                    + "$"
-                                    + split[0]
-                                    + "-"
-                                    + split[1]
-                                    + "."
-                                    + nameAndFormat[1]);
-            return new DecodeResult(realPath, Long.parseLong(split[2]), 
Long.parseLong(split[3]));
-        }
-    }
-
-    private static class DecodeResult {
-
-        private final Path path;
-        private final long offset;
-        private final long length;
-
-        private DecodeResult(Path path, long offset, long length) {
-            this.path = path;
-            this.offset = offset;
-            this.length = length;
-        }
-    }
-
     private static class OffsetReadOnlyFileIO implements FileIO {
 
         private final FileIO wrapped;
@@ -146,9 +97,12 @@ public class CompactedChangelogFormatReaderFactory 
implements FormatReaderFactor
 
         @Override
         public SeekableInputStream newInputStream(Path path) throws 
IOException {
-            DecodeResult result = decodePath(path);
+            CompactedChangelogPathResolver.DecodeResult result =
+                    CompactedChangelogPathResolver.decodePath(path);
             return new OffsetSeekableInputStream(
-                    wrapped.newInputStream(result.path), result.offset, 
result.length);
+                    wrapped.newInputStream(result.getPath()),
+                    result.getOffset(),
+                    result.getLength());
         }
 
         @Override
@@ -159,14 +113,15 @@ public class CompactedChangelogFormatReaderFactory 
implements FormatReaderFactor
 
         @Override
         public FileStatus getFileStatus(Path path) throws IOException {
-            DecodeResult result = decodePath(path);
-            FileStatus status = wrapped.getFileStatus(result.path);
+            CompactedChangelogPathResolver.DecodeResult result =
+                    CompactedChangelogPathResolver.decodePath(path);
+            FileStatus status = wrapped.getFileStatus(result.getPath());
 
             return new FileStatus() {
 
                 @Override
                 public long getLen() {
-                    return result.length;
+                    return result.getLength();
                 }
 
                 @Override
@@ -193,7 +148,7 @@ public class CompactedChangelogFormatReaderFactory 
implements FormatReaderFactor
 
         @Override
         public boolean exists(Path path) throws IOException {
-            return wrapped.exists(decodePath(path).path);
+            return 
wrapped.exists(CompactedChangelogPathResolver.decodePath(path).getPath());
         }
 
         @Override

Reply via email to