This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b2ea16c4aace21b72faf57a1760abf9d65035f3a Author: Gen Luo <luogen...@gmail.com> AuthorDate: Wed Jan 26 15:27:53 2022 +0800 [FLINK-25583][connectors/filesystem] Add the getPath and getSize methods in PendingFileRecoverable. --- .../file/sink/utils/FileSinkTestUtils.java | 19 +++ .../sink/filesystem/BulkBucketWriter.java | 5 +- .../functions/sink/filesystem/BulkPartWriter.java | 4 +- .../sink/filesystem/InProgressFileWriter.java | 13 +- .../OutputStreamBasedPartFileWriter.java | 162 +++++++++++++++++++-- .../sink/filesystem/RowWiseBucketWriter.java | 5 +- .../sink/filesystem/RowWisePartWriter.java | 4 +- .../hadoop/bulk/HadoopPathBasedPartFileWriter.java | 64 +++++++- 8 files changed, 254 insertions(+), 22 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java index 908aa42..b3d7041 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.file.sink.utils; import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; @@ -35,12 +36,30 @@ public class FileSinkTestUtils { /** A type of testing {@link InProgressFileWriter.PendingFileRecoverable}. */ public static class TestPendingFileRecoverable extends StringValue implements InProgressFileWriter.PendingFileRecoverable { + @Override + public Path getPath() { + return null; + } + + @Override + public long getSize() { + return -1L; + } // Nope } /** A type of testing {@link InProgressFileWriter.InProgressFileRecoverable}. */ public static class TestInProgressFileRecoverable extends StringValue implements InProgressFileWriter.InProgressFileRecoverable { + @Override + public Path getPath() { + return null; + } + + @Override + public long getSize() { + return -1L; + } // Nope } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java index 7906243..0c4ee74 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java @@ -50,6 +50,7 @@ public class BulkBucketWriter<IN, BucketID> public InProgressFileWriter<IN, BucketID> resumeFrom( final BucketID bucketId, final RecoverableFsDataOutputStream stream, + final Path path, final RecoverableWriter.ResumeRecoverable resumable, final long creationTime) throws IOException { @@ -58,7 +59,7 @@ public class BulkBucketWriter<IN, BucketID> Preconditions.checkNotNull(resumable); final BulkWriter<IN> writer = writerFactory.create(stream); - return new BulkPartWriter<>(bucketId, stream, writer, creationTime); + return new BulkPartWriter<>(bucketId, path, stream, writer, creationTime); } @Override @@ -73,6 +74,6 @@ public class BulkBucketWriter<IN, BucketID> Preconditions.checkNotNull(path); final BulkWriter<IN> writer = writerFactory.create(stream); - return new BulkPartWriter<>(bucketId, stream, writer, creationTime); + return new BulkPartWriter<>(bucketId, path, stream, writer, creationTime); } } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java index 758296d..d770c69 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.util.Preconditions; @@ -36,10 +37,11 @@ final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter BulkPartWriter( final BucketID bucketId, + final Path path, final RecoverableFsDataOutputStream currentPartStream, final BulkWriter<IN> writer, final long creationTime) { - super(bucketId, currentPartStream, creationTime); + super(bucketId, path, currentPartStream, creationTime); this.writer = Preconditions.checkNotNull(writer); } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java index f633023..dbc8159 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java @@ -20,6 +20,9 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.fs.Path; + +import javax.annotation.Nullable; import java.io.IOException; @@ -65,5 +68,13 @@ public interface InProgressFileWriter<IN, BucketID> /** The handle can be used to recover pending file. */ @PublicEvolving - interface PendingFileRecoverable {} + interface PendingFileRecoverable { + + /** @return The target path of the pending file, null if unavailable. */ + @Nullable + Path getPath(); + + /** @return The size of the pending file, -1 if unavailable. */ + long getSize(); + } } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java index 13ceae5..06d3bf9 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java @@ -29,6 +29,8 @@ import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.IOUtils; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.OutputStream; import java.util.Objects; @@ -46,25 +48,31 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> final RecoverableFsDataOutputStream currentPartStream; + @Nullable final Path targetPath; + private CompactingFileWriter.Type writeType = null; OutputStreamBasedPartFileWriter( final BucketID bucketID, + @Nullable final Path path, final RecoverableFsDataOutputStream recoverableFsDataOutputStream, final long createTime) { super(bucketID, createTime); + this.targetPath = path; this.currentPartStream = recoverableFsDataOutputStream; } @Override public InProgressFileRecoverable persist() throws IOException { - return new OutputStreamBasedInProgressFileRecoverable(currentPartStream.persist()); + return new OutputStreamBasedInProgressFileRecoverable( + currentPartStream.persist(), targetPath); } @Override public PendingFileRecoverable closeForCommit() throws IOException { + long size = currentPartStream.getPos(); return new OutputStreamBasedPendingFileRecoverable( - currentPartStream.closeForCommit().getRecoverable()); + currentPartStream.closeForCommit().getRecoverable(), targetPath, size); } @Override @@ -137,6 +145,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> bucketID, recoverableWriter.recover( outputStreamBasedInProgressRecoverable.getResumeRecoverable()), + inProgressFileRecoverable.getPath(), outputStreamBasedInProgressRecoverable.getResumeRecoverable(), creationTime); } @@ -192,6 +201,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> public abstract InProgressFileWriter<IN, BucketID> resumeFrom( final BucketID bucketId, final RecoverableFsDataOutputStream stream, + final Path path, final RecoverableWriter.ResumeRecoverable resumable, final long creationTime) throws IOException; @@ -205,14 +215,60 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> private final RecoverableWriter.CommitRecoverable commitRecoverable; + @Nullable private final Path targetPath; + private final long fileSize; + + @Deprecated + // Remained for state compatibility public OutputStreamBasedPendingFileRecoverable( final RecoverableWriter.CommitRecoverable commitRecoverable) { + this(commitRecoverable, null, -1L); + } + + public OutputStreamBasedPendingFileRecoverable( + final RecoverableWriter.CommitRecoverable commitRecoverable, + @Nullable final Path targetPath, + final long fileSize) { this.commitRecoverable = commitRecoverable; + this.targetPath = targetPath; + this.fileSize = fileSize; } RecoverableWriter.CommitRecoverable getCommitRecoverable() { return commitRecoverable; } + + @Override + public Path getPath() { + return targetPath; + } + + @Override + public long getSize() { + return fileSize; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + OutputStreamBasedPendingFileRecoverable that = + (OutputStreamBasedPendingFileRecoverable) o; + return fileSize == that.fileSize + && Objects.equals(commitRecoverable, that.commitRecoverable) + && Objects.equals(targetPath, that.targetPath); + } + + @Override + public int hashCode() { + return Objects.hash(commitRecoverable, targetPath, fileSize); + } } /** @@ -223,15 +279,57 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> implements InProgressFileRecoverable { private final RecoverableWriter.ResumeRecoverable resumeRecoverable; + @Nullable private final Path targetPath; + @Deprecated + // Remained for state compatibility public OutputStreamBasedInProgressFileRecoverable( final RecoverableWriter.ResumeRecoverable resumeRecoverable) { + this(resumeRecoverable, null); + } + + public OutputStreamBasedInProgressFileRecoverable( + final RecoverableWriter.ResumeRecoverable resumeRecoverable, + @Nullable final Path targetPath) { this.resumeRecoverable = resumeRecoverable; + this.targetPath = targetPath; } RecoverableWriter.ResumeRecoverable getResumeRecoverable() { return resumeRecoverable; } + + @Override + public Path getPath() { + return targetPath; + } + + @Override + public long getSize() { + // File size of an in progress file is unavailable. + return -1L; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + OutputStreamBasedInProgressFileRecoverable that = + (OutputStreamBasedInProgressFileRecoverable) o; + return Objects.equals(resumeRecoverable, that.resumeRecoverable) + && Objects.equals(targetPath, that.targetPath); + } + + @Override + public int hashCode() { + return Objects.hash(resumeRecoverable, targetPath); + } } static final class OutputStreamBasedPendingFile implements BucketWriter.PendingFile { @@ -269,7 +367,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> @Override public int getVersion() { - return 1; + return 2; } @Override @@ -279,7 +377,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> (OutputStreamBasedInProgressFileRecoverable) inProgressRecoverable; DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256); dataOutputSerializer.writeInt(MAGIC_NUMBER); - serializeV1(outputStreamBasedInProgressRecoverable, dataOutputSerializer); + serializeV2(outputStreamBasedInProgressRecoverable, dataOutputSerializer); return dataOutputSerializer.getCopyOfBuffer(); } @@ -291,6 +389,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> DataInputView dataInputView = new DataInputDeserializer(serialized); validateMagicNumber(dataInputView); return deserializeV1(dataInputView); + case 2: + dataInputView = new DataInputDeserializer(serialized); + validateMagicNumber(dataInputView); + return deserializeV2(dataInputView); default: throw new IOException("Unrecognized version or corrupt state: " + version); } @@ -301,11 +403,17 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> return resumeSerializer; } - private void serializeV1( + private void serializeV2( final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable, final DataOutputView dataOutputView) throws IOException { + boolean pathAvailable = outputStreamBasedInProgressRecoverable.targetPath != null; + dataOutputView.writeBoolean(pathAvailable); + if (pathAvailable) { + dataOutputView.writeUTF( + outputStreamBasedInProgressRecoverable.targetPath.toUri().toString()); + } SimpleVersionedSerialization.writeVersionAndSerialize( resumeSerializer, outputStreamBasedInProgressRecoverable.getResumeRecoverable(), @@ -319,6 +427,18 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> resumeSerializer, dataInputView)); } + private OutputStreamBasedInProgressFileRecoverable deserializeV2( + final DataInputView dataInputView) throws IOException { + Path path = null; + if (dataInputView.readBoolean()) { + path = new Path(dataInputView.readUTF()); + } + return new OutputStreamBasedInProgressFileRecoverable( + SimpleVersionedSerialization.readVersionAndDeSerialize( + resumeSerializer, dataInputView), + path); + } + private static void validateMagicNumber(final DataInputView dataInputView) throws IOException { final int magicNumber = dataInputView.readInt(); @@ -346,7 +466,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> @Override public int getVersion() { - return 1; + return 2; } @Override @@ -355,7 +475,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> (OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable; DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256); dataOutputSerializer.writeInt(MAGIC_NUMBER); - serializeV1(outputStreamBasedPendingFileRecoverable, dataOutputSerializer); + serializeV2(outputStreamBasedPendingFileRecoverable, dataOutputSerializer); return dataOutputSerializer.getCopyOfBuffer(); } @@ -367,7 +487,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> DataInputDeserializer in = new DataInputDeserializer(serialized); validateMagicNumber(in); return deserializeV1(in); - + case 2: + in = new DataInputDeserializer(serialized); + validateMagicNumber(in); + return deserializeV2(in); default: throw new IOException("Unrecognized version or corrupt state: " + version); } @@ -378,11 +501,18 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> return this.commitSerializer; } - private void serializeV1( + private void serializeV2( final OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable, final DataOutputView dataOutputView) throws IOException { + boolean pathAvailable = outputStreamBasedPendingFileRecoverable.targetPath != null; + dataOutputView.writeBoolean(pathAvailable); + if (pathAvailable) { + dataOutputView.writeUTF( + outputStreamBasedPendingFileRecoverable.targetPath.toUri().toString()); + } + dataOutputView.writeLong(outputStreamBasedPendingFileRecoverable.getSize()); SimpleVersionedSerialization.writeVersionAndSerialize( commitSerializer, outputStreamBasedPendingFileRecoverable.getCommitRecoverable(), @@ -396,6 +526,20 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> commitSerializer, dataInputView)); } + private OutputStreamBasedPendingFileRecoverable deserializeV2( + final DataInputView dataInputView) throws IOException { + Path path = null; + if (dataInputView.readBoolean()) { + path = new Path(dataInputView.readUTF()); + } + long size = dataInputView.readLong(); + return new OutputStreamBasedPendingFileRecoverable( + SimpleVersionedSerialization.readVersionAndDeSerialize( + commitSerializer, dataInputView), + path, + size); + } + private static void validateMagicNumber(final DataInputView dataInputView) throws IOException { final int magicNumber = dataInputView.readInt(); diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java index 3476c26..1799195 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java @@ -47,13 +47,14 @@ public class RowWiseBucketWriter<IN, BucketID> public InProgressFileWriter<IN, BucketID> resumeFrom( final BucketID bucketId, final RecoverableFsDataOutputStream stream, + final Path path, final RecoverableWriter.ResumeRecoverable resumable, final long creationTime) { Preconditions.checkNotNull(stream); Preconditions.checkNotNull(resumable); - return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime); + return new RowWisePartWriter<>(bucketId, path, stream, encoder, creationTime); } @Override @@ -66,6 +67,6 @@ public class RowWiseBucketWriter<IN, BucketID> Preconditions.checkNotNull(stream); Preconditions.checkNotNull(path); - return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime); + return new RowWisePartWriter<>(bucketId, path, stream, encoder, creationTime); } } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java index f2b473c..2510c41 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.util.Preconditions; @@ -37,10 +38,11 @@ public final class RowWisePartWriter<IN, BucketID> public RowWisePartWriter( final BucketID bucketId, + final Path path, final RecoverableFsDataOutputStream currentPartStream, final Encoder<IN> encoder, final long creationTime) { - super(bucketId, currentPartStream, creationTime); + super(bucketId, path, currentPartStream, creationTime); this.encoder = Preconditions.checkNotNull(encoder); } diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java index cf30036..ccb6af7 100644 --- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java +++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java @@ -70,7 +70,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> writer.flush(); writer.finish(); fileCommitter.preCommit(); - return new HadoopPathBasedPendingFile(fileCommitter).getRecoverable(); + return new HadoopPathBasedPendingFile(fileCommitter, getSize()).getRecoverable(); } @Override @@ -86,8 +86,11 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> static class HadoopPathBasedPendingFile implements BucketWriter.PendingFile { private final HadoopFileCommitter fileCommitter; - public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter) { + private final long fileSize; + + public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter, long fileSize) { this.fileCommitter = fileCommitter; + this.fileSize = fileSize; } @Override @@ -102,7 +105,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> public PendingFileRecoverable getRecoverable() { return new HadoopPathBasedPendingFileRecoverable( - fileCommitter.getTargetFilePath(), fileCommitter.getTempFilePath()); + fileCommitter.getTargetFilePath(), fileCommitter.getTempFilePath(), fileSize); } } @@ -112,9 +115,21 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> private final Path tempFilePath; + private final long fileSize; + + @Deprecated + // Remained for compatibility public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath) { this.targetFilePath = targetFilePath; this.tempFilePath = tempFilePath; + this.fileSize = -1L; + } + + public HadoopPathBasedPendingFileRecoverable( + Path targetFilePath, Path tempFilePath, long fileSize) { + this.targetFilePath = targetFilePath; + this.tempFilePath = tempFilePath; + this.fileSize = fileSize; } public Path getTargetFilePath() { @@ -124,6 +139,16 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> public Path getTempFilePath() { return tempFilePath; } + + @Override + public org.apache.flink.core.fs.Path getPath() { + return new org.apache.flink.core.fs.Path(targetFilePath.toString()); + } + + @Override + public long getSize() { + return fileSize; + } } @VisibleForTesting @@ -139,7 +164,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> @Override public int getVersion() { - return 1; + return 2; } @Override @@ -159,13 +184,15 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> byte[] pathBytes = path.toUri().toString().getBytes(CHARSET); byte[] inProgressBytes = inProgressPath.toUri().toString().getBytes(CHARSET); - byte[] targetBytes = new byte[12 + pathBytes.length + inProgressBytes.length]; + byte[] targetBytes = + new byte[12 + pathBytes.length + inProgressBytes.length + Long.BYTES]; ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN); bb.putInt(MAGIC_NUMBER); bb.putInt(pathBytes.length); bb.put(pathBytes); bb.putInt(inProgressBytes.length); bb.put(inProgressBytes); + bb.putLong(hadoopRecoverable.getSize()); return targetBytes; } @@ -176,6 +203,8 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> switch (version) { case 1: return deserializeV1(serialized); + case 2: + return deserializeV2(serialized); default: throw new IOException("Unrecognized version or corrupt state: " + version); } @@ -200,6 +229,28 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> return new HadoopPathBasedPendingFileRecoverable( new Path(targetFilePath), new Path(tempFilePath)); } + + private HadoopPathBasedPendingFileRecoverable deserializeV2(byte[] serialized) + throws IOException { + final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN); + + if (bb.getInt() != MAGIC_NUMBER) { + throw new IOException("Corrupt data: Unexpected magic number."); + } + + byte[] targetFilePathBytes = new byte[bb.getInt()]; + bb.get(targetFilePathBytes); + String targetFilePath = new String(targetFilePathBytes, CHARSET); + + byte[] tempFilePathBytes = new byte[bb.getInt()]; + bb.get(tempFilePathBytes); + String tempFilePath = new String(tempFilePathBytes, CHARSET); + + long fileSize = bb.getLong(); + + return new HadoopPathBasedPendingFileRecoverable( + new Path(targetFilePath), new Path(tempFilePath), fileSize); + } } private static class UnsupportedInProgressFileRecoverableSerializable @@ -281,7 +332,8 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> fileCommitterFactory.recoverForCommit( configuration, hadoopRecoverable.getTargetFilePath(), - hadoopRecoverable.getTempFilePath())); + hadoopRecoverable.getTempFilePath()), + hadoopRecoverable.getSize()); } @Override