This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2ea16c4aace21b72faf57a1760abf9d65035f3a
Author: Gen Luo <luogen...@gmail.com>
AuthorDate: Wed Jan 26 15:27:53 2022 +0800

    [FLINK-25583][connectors/filesystem] Add the getPath and getSize methods in 
PendingFileRecoverable.
---
 .../file/sink/utils/FileSinkTestUtils.java         |  19 +++
 .../sink/filesystem/BulkBucketWriter.java          |   5 +-
 .../functions/sink/filesystem/BulkPartWriter.java  |   4 +-
 .../sink/filesystem/InProgressFileWriter.java      |  13 +-
 .../OutputStreamBasedPartFileWriter.java           | 162 +++++++++++++++++++--
 .../sink/filesystem/RowWiseBucketWriter.java       |   5 +-
 .../sink/filesystem/RowWisePartWriter.java         |   4 +-
 .../hadoop/bulk/HadoopPathBasedPartFileWriter.java |  64 +++++++-
 8 files changed, 254 insertions(+), 22 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
index 908aa42..b3d7041 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/utils/FileSinkTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink.utils;
 
 import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
@@ -35,12 +36,30 @@ public class FileSinkTestUtils {
     /** A type of testing {@link InProgressFileWriter.PendingFileRecoverable}. 
*/
     public static class TestPendingFileRecoverable extends StringValue
             implements InProgressFileWriter.PendingFileRecoverable {
+        @Override
+        public Path getPath() {
+            return null;
+        }
+
+        @Override
+        public long getSize() {
+            return -1L;
+        }
         // Nope
     }
 
     /** A type of testing {@link 
InProgressFileWriter.InProgressFileRecoverable}. */
     public static class TestInProgressFileRecoverable extends StringValue
             implements InProgressFileWriter.InProgressFileRecoverable {
+        @Override
+        public Path getPath() {
+            return null;
+        }
+
+        @Override
+        public long getSize() {
+            return -1L;
+        }
         // Nope
     }
 
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
index 7906243..0c4ee74 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
@@ -50,6 +50,7 @@ public class BulkBucketWriter<IN, BucketID>
     public InProgressFileWriter<IN, BucketID> resumeFrom(
             final BucketID bucketId,
             final RecoverableFsDataOutputStream stream,
+            final Path path,
             final RecoverableWriter.ResumeRecoverable resumable,
             final long creationTime)
             throws IOException {
@@ -58,7 +59,7 @@ public class BulkBucketWriter<IN, BucketID>
         Preconditions.checkNotNull(resumable);
 
         final BulkWriter<IN> writer = writerFactory.create(stream);
-        return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+        return new BulkPartWriter<>(bucketId, path, stream, writer, 
creationTime);
     }
 
     @Override
@@ -73,6 +74,6 @@ public class BulkBucketWriter<IN, BucketID>
         Preconditions.checkNotNull(path);
 
         final BulkWriter<IN> writer = writerFactory.create(stream);
-        return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+        return new BulkPartWriter<>(bucketId, path, stream, writer, 
creationTime);
     }
 }
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 758296d..d770c69 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.util.Preconditions;
 
@@ -36,10 +37,11 @@ final class BulkPartWriter<IN, BucketID> extends 
OutputStreamBasedPartFileWriter
 
     BulkPartWriter(
             final BucketID bucketId,
+            final Path path,
             final RecoverableFsDataOutputStream currentPartStream,
             final BulkWriter<IN> writer,
             final long creationTime) {
-        super(bucketId, currentPartStream, creationTime);
+        super(bucketId, path, currentPartStream, creationTime);
         this.writer = Preconditions.checkNotNull(writer);
     }
 
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
index f633023..dbc8159 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -20,6 +20,9 @@ package 
org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 
@@ -65,5 +68,13 @@ public interface InProgressFileWriter<IN, BucketID>
 
     /** The handle can be used to recover pending file. */
     @PublicEvolving
-    interface PendingFileRecoverable {}
+    interface PendingFileRecoverable {
+
+        /** @return The target path of the pending file, null if unavailable. 
*/
+        @Nullable
+        Path getPath();
+
+        /** @return The size of the pending file, -1 if unavailable. */
+        long getSize();
+    }
 }
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
index 13ceae5..06d3bf9 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -29,6 +29,8 @@ import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.IOUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Objects;
@@ -46,25 +48,31 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
 
     final RecoverableFsDataOutputStream currentPartStream;
 
+    @Nullable final Path targetPath;
+
     private CompactingFileWriter.Type writeType = null;
 
     OutputStreamBasedPartFileWriter(
             final BucketID bucketID,
+            @Nullable final Path path,
             final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
             final long createTime) {
         super(bucketID, createTime);
+        this.targetPath = path;
         this.currentPartStream = recoverableFsDataOutputStream;
     }
 
     @Override
     public InProgressFileRecoverable persist() throws IOException {
-        return new 
OutputStreamBasedInProgressFileRecoverable(currentPartStream.persist());
+        return new OutputStreamBasedInProgressFileRecoverable(
+                currentPartStream.persist(), targetPath);
     }
 
     @Override
     public PendingFileRecoverable closeForCommit() throws IOException {
+        long size = currentPartStream.getPos();
         return new OutputStreamBasedPendingFileRecoverable(
-                currentPartStream.closeForCommit().getRecoverable());
+                currentPartStream.closeForCommit().getRecoverable(), 
targetPath, size);
     }
 
     @Override
@@ -137,6 +145,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
                     bucketID,
                     recoverableWriter.recover(
                             
outputStreamBasedInProgressRecoverable.getResumeRecoverable()),
+                    inProgressFileRecoverable.getPath(),
                     
outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
                     creationTime);
         }
@@ -192,6 +201,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
         public abstract InProgressFileWriter<IN, BucketID> resumeFrom(
                 final BucketID bucketId,
                 final RecoverableFsDataOutputStream stream,
+                final Path path,
                 final RecoverableWriter.ResumeRecoverable resumable,
                 final long creationTime)
                 throws IOException;
@@ -205,14 +215,60 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
 
         private final RecoverableWriter.CommitRecoverable commitRecoverable;
 
+        @Nullable private final Path targetPath;
+        private final long fileSize;
+
+        @Deprecated
+        // Remained for state compatibility
         public OutputStreamBasedPendingFileRecoverable(
                 final RecoverableWriter.CommitRecoverable commitRecoverable) {
+            this(commitRecoverable, null, -1L);
+        }
+
+        public OutputStreamBasedPendingFileRecoverable(
+                final RecoverableWriter.CommitRecoverable commitRecoverable,
+                @Nullable final Path targetPath,
+                final long fileSize) {
             this.commitRecoverable = commitRecoverable;
+            this.targetPath = targetPath;
+            this.fileSize = fileSize;
         }
 
         RecoverableWriter.CommitRecoverable getCommitRecoverable() {
             return commitRecoverable;
         }
+
+        @Override
+        public Path getPath() {
+            return targetPath;
+        }
+
+        @Override
+        public long getSize() {
+            return fileSize;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            OutputStreamBasedPendingFileRecoverable that =
+                    (OutputStreamBasedPendingFileRecoverable) o;
+            return fileSize == that.fileSize
+                    && Objects.equals(commitRecoverable, 
that.commitRecoverable)
+                    && Objects.equals(targetPath, that.targetPath);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(commitRecoverable, targetPath, fileSize);
+        }
     }
 
     /**
@@ -223,15 +279,57 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
             implements InProgressFileRecoverable {
 
         private final RecoverableWriter.ResumeRecoverable resumeRecoverable;
+        @Nullable private final Path targetPath;
 
+        @Deprecated
+        // Remained for state compatibility
         public OutputStreamBasedInProgressFileRecoverable(
                 final RecoverableWriter.ResumeRecoverable resumeRecoverable) {
+            this(resumeRecoverable, null);
+        }
+
+        public OutputStreamBasedInProgressFileRecoverable(
+                final RecoverableWriter.ResumeRecoverable resumeRecoverable,
+                @Nullable final Path targetPath) {
             this.resumeRecoverable = resumeRecoverable;
+            this.targetPath = targetPath;
         }
 
         RecoverableWriter.ResumeRecoverable getResumeRecoverable() {
             return resumeRecoverable;
         }
+
+        @Override
+        public Path getPath() {
+            return targetPath;
+        }
+
+        @Override
+        public long getSize() {
+            // File size of an in progress file is unavailable.
+            return -1L;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            OutputStreamBasedInProgressFileRecoverable that =
+                    (OutputStreamBasedInProgressFileRecoverable) o;
+            return Objects.equals(resumeRecoverable, that.resumeRecoverable)
+                    && Objects.equals(targetPath, that.targetPath);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(resumeRecoverable, targetPath);
+        }
     }
 
     static final class OutputStreamBasedPendingFile implements 
BucketWriter.PendingFile {
@@ -269,7 +367,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
 
         @Override
         public int getVersion() {
-            return 1;
+            return 2;
         }
 
         @Override
@@ -279,7 +377,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
                     (OutputStreamBasedInProgressFileRecoverable) 
inProgressRecoverable;
             DataOutputSerializer dataOutputSerializer = new 
DataOutputSerializer(256);
             dataOutputSerializer.writeInt(MAGIC_NUMBER);
-            serializeV1(outputStreamBasedInProgressRecoverable, 
dataOutputSerializer);
+            serializeV2(outputStreamBasedInProgressRecoverable, 
dataOutputSerializer);
             return dataOutputSerializer.getCopyOfBuffer();
         }
 
@@ -291,6 +389,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
                     DataInputView dataInputView = new 
DataInputDeserializer(serialized);
                     validateMagicNumber(dataInputView);
                     return deserializeV1(dataInputView);
+                case 2:
+                    dataInputView = new DataInputDeserializer(serialized);
+                    validateMagicNumber(dataInputView);
+                    return deserializeV2(dataInputView);
                 default:
                     throw new IOException("Unrecognized version or corrupt 
state: " + version);
             }
@@ -301,11 +403,17 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
             return resumeSerializer;
         }
 
-        private void serializeV1(
+        private void serializeV2(
                 final OutputStreamBasedInProgressFileRecoverable
                         outputStreamBasedInProgressRecoverable,
                 final DataOutputView dataOutputView)
                 throws IOException {
+            boolean pathAvailable = 
outputStreamBasedInProgressRecoverable.targetPath != null;
+            dataOutputView.writeBoolean(pathAvailable);
+            if (pathAvailable) {
+                dataOutputView.writeUTF(
+                        
outputStreamBasedInProgressRecoverable.targetPath.toUri().toString());
+            }
             SimpleVersionedSerialization.writeVersionAndSerialize(
                     resumeSerializer,
                     
outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
@@ -319,6 +427,18 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
                             resumeSerializer, dataInputView));
         }
 
+        private OutputStreamBasedInProgressFileRecoverable deserializeV2(
+                final DataInputView dataInputView) throws IOException {
+            Path path = null;
+            if (dataInputView.readBoolean()) {
+                path = new Path(dataInputView.readUTF());
+            }
+            return new OutputStreamBasedInProgressFileRecoverable(
+                    SimpleVersionedSerialization.readVersionAndDeSerialize(
+                            resumeSerializer, dataInputView),
+                    path);
+        }
+
         private static void validateMagicNumber(final DataInputView 
dataInputView)
                 throws IOException {
             final int magicNumber = dataInputView.readInt();
@@ -346,7 +466,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
 
         @Override
         public int getVersion() {
-            return 1;
+            return 2;
         }
 
         @Override
@@ -355,7 +475,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
                     (OutputStreamBasedPendingFileRecoverable) 
pendingFileRecoverable;
             DataOutputSerializer dataOutputSerializer = new 
DataOutputSerializer(256);
             dataOutputSerializer.writeInt(MAGIC_NUMBER);
-            serializeV1(outputStreamBasedPendingFileRecoverable, 
dataOutputSerializer);
+            serializeV2(outputStreamBasedPendingFileRecoverable, 
dataOutputSerializer);
             return dataOutputSerializer.getCopyOfBuffer();
         }
 
@@ -367,7 +487,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
                     DataInputDeserializer in = new 
DataInputDeserializer(serialized);
                     validateMagicNumber(in);
                     return deserializeV1(in);
-
+                case 2:
+                    in = new DataInputDeserializer(serialized);
+                    validateMagicNumber(in);
+                    return deserializeV2(in);
                 default:
                     throw new IOException("Unrecognized version or corrupt 
state: " + version);
             }
@@ -378,11 +501,18 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
             return this.commitSerializer;
         }
 
-        private void serializeV1(
+        private void serializeV2(
                 final OutputStreamBasedPendingFileRecoverable
                         outputStreamBasedPendingFileRecoverable,
                 final DataOutputView dataOutputView)
                 throws IOException {
+            boolean pathAvailable = 
outputStreamBasedPendingFileRecoverable.targetPath != null;
+            dataOutputView.writeBoolean(pathAvailable);
+            if (pathAvailable) {
+                dataOutputView.writeUTF(
+                        
outputStreamBasedPendingFileRecoverable.targetPath.toUri().toString());
+            }
+            
dataOutputView.writeLong(outputStreamBasedPendingFileRecoverable.getSize());
             SimpleVersionedSerialization.writeVersionAndSerialize(
                     commitSerializer,
                     
outputStreamBasedPendingFileRecoverable.getCommitRecoverable(),
@@ -396,6 +526,20 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID>
                             commitSerializer, dataInputView));
         }
 
+        private OutputStreamBasedPendingFileRecoverable deserializeV2(
+                final DataInputView dataInputView) throws IOException {
+            Path path = null;
+            if (dataInputView.readBoolean()) {
+                path = new Path(dataInputView.readUTF());
+            }
+            long size = dataInputView.readLong();
+            return new OutputStreamBasedPendingFileRecoverable(
+                    SimpleVersionedSerialization.readVersionAndDeSerialize(
+                            commitSerializer, dataInputView),
+                    path,
+                    size);
+        }
+
         private static void validateMagicNumber(final DataInputView 
dataInputView)
                 throws IOException {
             final int magicNumber = dataInputView.readInt();
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
index 3476c26..1799195 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
@@ -47,13 +47,14 @@ public class RowWiseBucketWriter<IN, BucketID>
     public InProgressFileWriter<IN, BucketID> resumeFrom(
             final BucketID bucketId,
             final RecoverableFsDataOutputStream stream,
+            final Path path,
             final RecoverableWriter.ResumeRecoverable resumable,
             final long creationTime) {
 
         Preconditions.checkNotNull(stream);
         Preconditions.checkNotNull(resumable);
 
-        return new RowWisePartWriter<>(bucketId, stream, encoder, 
creationTime);
+        return new RowWisePartWriter<>(bucketId, path, stream, encoder, 
creationTime);
     }
 
     @Override
@@ -66,6 +67,6 @@ public class RowWiseBucketWriter<IN, BucketID>
         Preconditions.checkNotNull(stream);
         Preconditions.checkNotNull(path);
 
-        return new RowWisePartWriter<>(bucketId, stream, encoder, 
creationTime);
+        return new RowWisePartWriter<>(bucketId, path, stream, encoder, 
creationTime);
     }
 }
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index f2b473c..2510c41 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.util.Preconditions;
 
@@ -37,10 +38,11 @@ public final class RowWisePartWriter<IN, BucketID>
 
     public RowWisePartWriter(
             final BucketID bucketId,
+            final Path path,
             final RecoverableFsDataOutputStream currentPartStream,
             final Encoder<IN> encoder,
             final long creationTime) {
-        super(bucketId, currentPartStream, creationTime);
+        super(bucketId, path, currentPartStream, creationTime);
         this.encoder = Preconditions.checkNotNull(encoder);
     }
 
diff --git 
a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
 
b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
index cf30036..ccb6af7 100644
--- 
a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
+++ 
b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
@@ -70,7 +70,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
         writer.flush();
         writer.finish();
         fileCommitter.preCommit();
-        return new HadoopPathBasedPendingFile(fileCommitter).getRecoverable();
+        return new HadoopPathBasedPendingFile(fileCommitter, 
getSize()).getRecoverable();
     }
 
     @Override
@@ -86,8 +86,11 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
     static class HadoopPathBasedPendingFile implements 
BucketWriter.PendingFile {
         private final HadoopFileCommitter fileCommitter;
 
-        public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter) {
+        private final long fileSize;
+
+        public HadoopPathBasedPendingFile(HadoopFileCommitter fileCommitter, 
long fileSize) {
             this.fileCommitter = fileCommitter;
+            this.fileSize = fileSize;
         }
 
         @Override
@@ -102,7 +105,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
 
         public PendingFileRecoverable getRecoverable() {
             return new HadoopPathBasedPendingFileRecoverable(
-                    fileCommitter.getTargetFilePath(), 
fileCommitter.getTempFilePath());
+                    fileCommitter.getTargetFilePath(), 
fileCommitter.getTempFilePath(), fileSize);
         }
     }
 
@@ -112,9 +115,21 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
 
         private final Path tempFilePath;
 
+        private final long fileSize;
+
+        @Deprecated
+        // Remained for compatibility
         public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path 
tempFilePath) {
             this.targetFilePath = targetFilePath;
             this.tempFilePath = tempFilePath;
+            this.fileSize = -1L;
+        }
+
+        public HadoopPathBasedPendingFileRecoverable(
+                Path targetFilePath, Path tempFilePath, long fileSize) {
+            this.targetFilePath = targetFilePath;
+            this.tempFilePath = tempFilePath;
+            this.fileSize = fileSize;
         }
 
         public Path getTargetFilePath() {
@@ -124,6 +139,16 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
         public Path getTempFilePath() {
             return tempFilePath;
         }
+
+        @Override
+        public org.apache.flink.core.fs.Path getPath() {
+            return new 
org.apache.flink.core.fs.Path(targetFilePath.toString());
+        }
+
+        @Override
+        public long getSize() {
+            return fileSize;
+        }
     }
 
     @VisibleForTesting
@@ -139,7 +164,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
 
         @Override
         public int getVersion() {
-            return 1;
+            return 2;
         }
 
         @Override
@@ -159,13 +184,15 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
             byte[] pathBytes = path.toUri().toString().getBytes(CHARSET);
             byte[] inProgressBytes = 
inProgressPath.toUri().toString().getBytes(CHARSET);
 
-            byte[] targetBytes = new byte[12 + pathBytes.length + 
inProgressBytes.length];
+            byte[] targetBytes =
+                    new byte[12 + pathBytes.length + inProgressBytes.length + 
Long.BYTES];
             ByteBuffer bb = 
ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
             bb.putInt(MAGIC_NUMBER);
             bb.putInt(pathBytes.length);
             bb.put(pathBytes);
             bb.putInt(inProgressBytes.length);
             bb.put(inProgressBytes);
+            bb.putLong(hadoopRecoverable.getSize());
 
             return targetBytes;
         }
@@ -176,6 +203,8 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
             switch (version) {
                 case 1:
                     return deserializeV1(serialized);
+                case 2:
+                    return deserializeV2(serialized);
                 default:
                     throw new IOException("Unrecognized version or corrupt 
state: " + version);
             }
@@ -200,6 +229,28 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
             return new HadoopPathBasedPendingFileRecoverable(
                     new Path(targetFilePath), new Path(tempFilePath));
         }
+
+        private HadoopPathBasedPendingFileRecoverable deserializeV2(byte[] 
serialized)
+                throws IOException {
+            final ByteBuffer bb = 
ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+            if (bb.getInt() != MAGIC_NUMBER) {
+                throw new IOException("Corrupt data: Unexpected magic 
number.");
+            }
+
+            byte[] targetFilePathBytes = new byte[bb.getInt()];
+            bb.get(targetFilePathBytes);
+            String targetFilePath = new String(targetFilePathBytes, CHARSET);
+
+            byte[] tempFilePathBytes = new byte[bb.getInt()];
+            bb.get(tempFilePathBytes);
+            String tempFilePath = new String(tempFilePathBytes, CHARSET);
+
+            long fileSize = bb.getLong();
+
+            return new HadoopPathBasedPendingFileRecoverable(
+                    new Path(targetFilePath), new Path(tempFilePath), 
fileSize);
+        }
     }
 
     private static class UnsupportedInProgressFileRecoverableSerializable
@@ -281,7 +332,8 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID>
                     fileCommitterFactory.recoverForCommit(
                             configuration,
                             hadoopRecoverable.getTargetFilePath(),
-                            hadoopRecoverable.getTempFilePath()));
+                            hadoopRecoverable.getTempFilePath()),
+                    hadoopRecoverable.getSize());
         }
 
         @Override

Reply via email to