This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 92e2f5576a [core] Fix checkpoint recovery failure for compacted
changelog files (#6173)
92e2f5576a is described below
commit 92e2f5576abca9130299ac1353e0b553a3857c04
Author: Sean Xia <[email protected]>
AuthorDate: Thu Sep 11 15:36:40 2025 +0800
[core] Fix checkpoint recovery failure for compacted changelog files (#6173)
---
.../apache/paimon/table/sink/TableCommitImpl.java | 12 +-
.../utils/CompactedChangelogPathResolver.java | 130 +++++++++++++++
.../apache/paimon/table/sink/TableCommitTest.java | 178 +++++++++++++++++++++
.../utils/CompactedChangelogPathResolverTest.java | 115 +++++++++++++
.../CompactedChangelogFormatReaderFactory.java | 73 ++-------
5 files changed, 448 insertions(+), 60 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index c1a69a2391..95b2df34f5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -33,6 +33,7 @@ import org.apache.paimon.stats.Statistics;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagTimeExpire;
+import org.apache.paimon.utils.CompactedChangelogPathResolver;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.IndexFilePathFactories;
@@ -300,6 +301,15 @@ public class TableCommitImpl implements InnerTableCommit {
}
}
+ // Resolve compacted changelog files to their real file paths
+ List<Path> resolvedFiles = new ArrayList<>();
+ for (Path file : files) {
+
resolvedFiles.add(CompactedChangelogPathResolver.resolveCompactedChangelogPath(file));
+ }
+ // Deduplicate paths as multiple compacted changelog references may
resolve to the same
+ // physical file
+ resolvedFiles =
resolvedFiles.stream().distinct().collect(Collectors.toList());
+
Predicate<Path> nonExists =
p -> {
try {
@@ -314,7 +324,7 @@ public class TableCommitImpl implements InnerTableCommit {
randomlyExecuteSequentialReturn(
getExecutorService(null),
f -> nonExists.test(f) ? singletonList(f) :
emptyList(),
- files));
+ resolvedFiles));
if (!nonExistFiles.isEmpty()) {
String message =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java
b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java
new file mode 100644
index 0000000000..5263497ecc
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.Path;
+
+/**
+ * Utility class for resolving compacted changelog file paths.
+ *
+ * <p>This class provides functionality to resolve fake compacted changelog
file paths to their real
+ * file paths.
+ *
+ * <p><b>File Name Protocol</b>
+ *
+ * <p>There are two kinds of file name. In the following description,
<code>bid1</code> and <code>
+ * bid2</code> are bucket id, <code>off</code> is offset, <code>len1</code>
and <code>len2</code>
+ * are lengths.
+ *
+ * <ul>
+ * <li><code>bucket-bid1/compacted-changelog-xxx$bid1-len1</code>: This is
the real file name. If
+ * this file name is recorded in manifest file meta, reader should read
the bytes of this file
+ * starting from offset <code>0</code> with length <code>len1</code>.
+ * <li><code>bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2</code>:
This is the fake file
+ * name. Reader should read the bytes of file <code>
+ * bucket-bid1/compacted-changelog-xxx$bid1-len1</code> starting from
offset <code>off</code>
+ * with length <code>len2</code>.
+ * </ul>
+ */
+public class CompactedChangelogPathResolver {
+
+ /**
+ * Checks if the given path is a compacted changelog file path.
+ *
+ * @param path the file path to check
+ * @return true if the path is a compacted changelog file, false otherwise
+ */
+ public static boolean isCompactedChangelogPath(Path path) {
+ return path.getName().startsWith("compacted-changelog-");
+ }
+
+ /**
+ * Resolves a file path, handling compacted changelog file path resolution
if applicable.
+ *
+ * <p>For compacted changelog files, resolves fake file paths to their
real file paths as
+ * described in the protocol above. For non-compacted changelog files,
returns the path
+ * unchanged.
+ *
+ * @param path the file path to resolve
+ * @return the resolved real file path for compacted changelog files, or
the original path
+ * unchanged for other files
+ */
+ public static Path resolveCompactedChangelogPath(Path path) {
+ if (!isCompactedChangelogPath(path)) {
+ return path;
+ }
+ return decodePath(path).getPath();
+ }
+
+ /**
+ * Decodes a compacted changelog file path to extract the real path,
offset, and length.
+ *
+ * @param path the file path to decode
+ * @return the decode result containing real path, offset, and length
+ */
+ public static DecodeResult decodePath(Path path) {
+ String[] nameAndFormat = path.getName().split("\\.");
+ String[] names = nameAndFormat[0].split("\\$");
+ String[] split = names[1].split("-");
+ if (split.length == 2) {
+ return new DecodeResult(path, 0, Long.parseLong(split[1]));
+ } else {
+ Path realPath =
+ new Path(
+ path.getParent().getParent(),
+ "bucket-"
+ + split[0]
+ + "/"
+ + names[0]
+ + "$"
+ + split[0]
+ + "-"
+ + split[1]
+ + "."
+ + nameAndFormat[1]);
+ return new DecodeResult(realPath, Long.parseLong(split[2]),
Long.parseLong(split[3]));
+ }
+ }
+
+ /** Result of decoding a compacted changelog file path. */
+ public static class DecodeResult {
+
+ private final Path path;
+ private final long offset;
+ private final long length;
+
+ public DecodeResult(Path path, long offset, long length) {
+ this.path = path;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index f8cb17bd91..0a39ca7ef8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -24,7 +24,10 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
@@ -34,6 +37,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
@@ -438,4 +442,178 @@ public class TableCommitTest {
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(5);
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
}
+
+ @Test
+ public void testRecoverCompactedChangelogFiles() throws Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 3);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ options.toMap(),
+ ""));
+
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+
+ // Create fake compacted changelog files that should resolve to real
files
+ String realChangelogFile =
+
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet";
+ String fakeChangelogFile1 =
+
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet";
+ String fakeChangelogFile2 =
+
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-74952-37725.cc-parquet";
+
+ // Create directory structure
+ Path bucket0Dir = new Path(path, "bucket-0");
+ Path bucket1Dir = new Path(path, "bucket-1");
+ Path bucket2Dir = new Path(path, "bucket-2");
+ LocalFileIO.create().mkdirs(bucket0Dir);
+ LocalFileIO.create().mkdirs(bucket1Dir);
+ LocalFileIO.create().mkdirs(bucket2Dir);
+
+ // Create the real compacted changelog file
+ Path realFilePath = new Path(bucket0Dir, realChangelogFile);
+ LocalFileIO.create().newOutputStream(realFilePath, false).close();
+
+ DataFileMeta realFileMeta =
+ DataFileMeta.forAppend(
+ realChangelogFile,
+ 3000L,
+ 300L,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ 1L,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+
+ // Create fake DataFileMeta for compacted changelog files
+ DataFileMeta fakeFileMeta1 =
+ DataFileMeta.forAppend(
+ fakeChangelogFile1,
+ 1000L,
+ 100L,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ 1L,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+
+ DataFileMeta fakeFileMeta2 =
+ DataFileMeta.forAppend(
+ fakeChangelogFile2,
+ 2000L,
+ 200L,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ 1L,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+
+ // Create commit message with fake compacted changelog files
+ BinaryRow partition = BinaryRow.EMPTY_ROW;
+ CommitMessageImpl commitMessage0 =
+ new CommitMessageImpl(
+ partition,
+ 0,
+ 3,
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(realFileMeta)),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()));
+ CommitMessageImpl commitMessage1 =
+ new CommitMessageImpl(
+ partition,
+ 1,
+ 3,
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(fakeFileMeta1)),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()));
+ CommitMessageImpl commitMessage2 =
+ new CommitMessageImpl(
+ partition,
+ 2,
+ 3,
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(fakeFileMeta2)),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList()));
+
+ ManifestCommittable committable = new ManifestCommittable(1L);
+ committable.addFileCommittable(commitMessage0);
+ committable.addFileCommittable(commitMessage1);
+ committable.addFileCommittable(commitMessage2);
+
+ String commitUser = UUID.randomUUID().toString();
+ try (TableCommitImpl commit = table.newCommit(commitUser)) {
+ // This should succeed because fake files resolve to the existing
real file
+
commit.filterAndCommitMultiple(Collections.singletonList(committable), false);
+ }
+
+ // Now delete the real file and test that the check fails
+ LocalFileIO.create().delete(realFilePath, false);
+
+ // Create a new committable with a larger identifier to simulate
recovery from checkpoint
+ // This identifier must be larger than the previously committed
identifier (1L)
+ ManifestCommittable newCommittable = new ManifestCommittable(2L);
+ newCommittable.addFileCommittable(commitMessage0);
+ newCommittable.addFileCommittable(commitMessage1);
+ newCommittable.addFileCommittable(commitMessage2);
+
+ try (TableCommitImpl commit = table.newCommit(commitUser)) {
+ assertThatThrownBy(
+ () ->
+ commit.filterAndCommitMultiple(
+
Collections.singletonList(newCommittable), false))
+ .hasMessageContaining(
+ "Cannot recover from this checkpoint because some
files in the"
+ + " snapshot that need to be resubmitted
have been deleted");
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java
new file mode 100644
index 0000000000..f2b7941096
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.Path;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CompactedChangelogPathResolver}. */
+public class CompactedChangelogPathResolverTest {
+
+ @Test
+ public void testIsCompactedChangelogPath() {
+ // Test non-compacted changelog file
+ Path regularFile =
+ new Path(
+
"/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet");
+
assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(regularFile)).isFalse();
+
+ // Test compacted changelog file
+ Path compactedFile =
+ new Path(
+
"/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet");
+
assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(compactedFile)).isTrue();
+
+ // Test regular data file
+ Path dataFile = new
Path("/path/to/table/bucket-0/data-file-1.parquet");
+
assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(dataFile)).isFalse();
+ }
+
+ @Test
+ public void testResolveNonCompactedChangelogFile() {
+ // Test regular changelog file - should return unchanged
+ Path regularFile =
+ new Path(
+
"/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet");
+ Path resolved =
CompactedChangelogPathResolver.resolveCompactedChangelogPath(regularFile);
+ assertThat(resolved).isEqualTo(regularFile);
+ }
+
+ @Test
+ public void testResolveRealCompactedChangelogFile() {
+ // Test real compacted changelog file - should return unchanged
+ Path realFile =
+ new Path(
+
"/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet");
+ Path resolved =
CompactedChangelogPathResolver.resolveCompactedChangelogPath(realFile);
+ assertThat(resolved).isEqualTo(realFile);
+ }
+
+ @Test
+ public void testResolveFakeCompactedChangelogFile() {
+ // Test fake compacted changelog file - should resolve to real path
+ Path fakeFile =
+ new Path(
+
"/path/to/table/bucket-1/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet");
+ Path resolved =
CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeFile);
+
+ Path expectedRealFile =
+ new Path(
+
"/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet");
+ assertThat(resolved).isEqualTo(expectedRealFile);
+ }
+
+ @Test
+ public void testResolveWithDifferentFormats() {
+ // Test with different file formats
+ Path fakeOrcFile =
+ new Path(
+
"/path/to/table/bucket-2/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-1024-1024-512.cc-orc");
+ Path resolvedOrc =
+
CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeOrcFile);
+ Path expectedOrcFile =
+ new Path(
+
"/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-1024.cc-orc");
+ assertThat(resolvedOrc).isEqualTo(expectedOrcFile);
+
+ Path fakeAvroFile =
+ new Path(
+
"/path/to/table/bucket-5/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$2-2048-2048-1024.cc-avro");
+ Path resolvedAvro =
+
CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeAvroFile);
+ Path expectedAvroFile =
+ new Path(
+
"/path/to/table/bucket-2/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$2-2048.cc-avro");
+ assertThat(resolvedAvro).isEqualTo(expectedAvroFile);
+ }
+
+ @Test
+ public void testResolveFileWithoutExtension() {
+ // Test file without file extension - should return unchanged
+ Path fileWithoutExt = new Path("/path/to/table/file");
+ Path resolved =
+
CompactedChangelogPathResolver.resolveCompactedChangelogPath(fileWithoutExt);
+ assertThat(resolved).isEqualTo(fileWithoutExt);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
index cd4a1d8c3d..70d2555cf3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -27,6 +27,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.utils.CompactedChangelogPathResolver;
import org.apache.paimon.utils.RoaringBitmap32;
import java.io.EOFException;
@@ -35,21 +36,8 @@ import java.io.IOException;
/**
* {@link FormatReaderFactory} for compacted changelog.
*
- * <p><b>File Name Protocol</b>
- *
- * <p>There are two kinds of file name. In the following description,
<code>bid1</code> and <code>
- * bid2</code> are bucket id, <code>off</code> is offset, <code>len1</code>
and <code>len2</code>
- * are lengths.
- *
- * <ul>
- * <li><code>bucket-bid1/compacted-changelog-xxx$bid1-len1</code>: This is
the real file name. If
- * this file name is recorded in manifest file meta, reader should read
the bytes of this file
- * starting from offset <code>0</code> with length <code>len1</code>.
- * <li><code>bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2</code>:
This is the fake file
- * name. Reader should read the bytes of file <code>
- * bucket-bid1/compacted-changelog-xxx$bid1-len1</code> starting from
offset <code>off</code>
- * with length <code>len2</code>.
- * </ul>
+ * <p>Uses {@link org.apache.paimon.utils.CompactedChangelogPathResolver} for
file name protocol
+ * handling.
*/
public class CompactedChangelogFormatReaderFactory implements
FormatReaderFactory {
@@ -62,7 +50,7 @@ public class CompactedChangelogFormatReaderFactory implements
FormatReaderFactor
@Override
public FileRecordReader<InternalRow> createReader(Context context) throws
IOException {
OffsetReadOnlyFileIO fileIO = new
OffsetReadOnlyFileIO(context.fileIO());
- long length = decodePath(context.filePath()).length;
+ long length =
CompactedChangelogPathResolver.decodePath(context.filePath()).getLength();
return wrapped.createReader(
new Context() {
@@ -89,43 +77,6 @@ public class CompactedChangelogFormatReaderFactory
implements FormatReaderFactor
});
}
- private static DecodeResult decodePath(Path path) {
- String[] nameAndFormat = path.getName().split("\\.");
- String[] names = nameAndFormat[0].split("\\$");
- String[] split = names[1].split("-");
- if (split.length == 2) {
- return new DecodeResult(path, 0, Long.parseLong(split[1]));
- } else {
- Path realPath =
- new Path(
- path.getParent().getParent(),
- "bucket-"
- + split[0]
- + "/"
- + names[0]
- + "$"
- + split[0]
- + "-"
- + split[1]
- + "."
- + nameAndFormat[1]);
- return new DecodeResult(realPath, Long.parseLong(split[2]),
Long.parseLong(split[3]));
- }
- }
-
- private static class DecodeResult {
-
- private final Path path;
- private final long offset;
- private final long length;
-
- private DecodeResult(Path path, long offset, long length) {
- this.path = path;
- this.offset = offset;
- this.length = length;
- }
- }
-
private static class OffsetReadOnlyFileIO implements FileIO {
private final FileIO wrapped;
@@ -146,9 +97,12 @@ public class CompactedChangelogFormatReaderFactory
implements FormatReaderFactor
@Override
public SeekableInputStream newInputStream(Path path) throws
IOException {
- DecodeResult result = decodePath(path);
+ CompactedChangelogPathResolver.DecodeResult result =
+ CompactedChangelogPathResolver.decodePath(path);
return new OffsetSeekableInputStream(
- wrapped.newInputStream(result.path), result.offset,
result.length);
+ wrapped.newInputStream(result.getPath()),
+ result.getOffset(),
+ result.getLength());
}
@Override
@@ -159,14 +113,15 @@ public class CompactedChangelogFormatReaderFactory
implements FormatReaderFactor
@Override
public FileStatus getFileStatus(Path path) throws IOException {
- DecodeResult result = decodePath(path);
- FileStatus status = wrapped.getFileStatus(result.path);
+ CompactedChangelogPathResolver.DecodeResult result =
+ CompactedChangelogPathResolver.decodePath(path);
+ FileStatus status = wrapped.getFileStatus(result.getPath());
return new FileStatus() {
@Override
public long getLen() {
- return result.length;
+ return result.getLength();
}
@Override
@@ -193,7 +148,7 @@ public class CompactedChangelogFormatReaderFactory
implements FormatReaderFactor
@Override
public boolean exists(Path path) throws IOException {
- return wrapped.exists(decodePath(path).path);
+ return
wrapped.exists(CompactedChangelogPathResolver.decodePath(path).getPath());
}
@Override