JingsongLi commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426233235



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
##########
@@ -20,122 +20,172 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 
 /**
- * An abstract writer for the currently open part file in a specific {@link 
Bucket}.
- *
- * <p>Currently, there are two subclasses, of this class:
- * <ol>
- *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
- *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
- * </ol>
- *
- * <p>This also implements the {@link PartFileInfo}.
+ * The {@link Bucket} uses the {@link PartFileWriter} to write element to a 
part file.
  */
 @Internal
-abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
+interface PartFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
 
-       private final BucketID bucketId;
+       /**
+        * Write a element to the part file.
+        * @param element the element to be written.
+        * @param currentTime the writing time.
+        * @throws IOException Thrown if writing the element fails.
+        */
+       void write(final IN element, final long currentTime) throws IOException;
 
-       private final long creationTime;
+       /**
+        * @return The state of the current part file.
+        * @throws IOException Thrown if persisting the part file fails.
+        */
+       InProgressFileRecoverable persist() throws IOException;
 
-       protected final RecoverableFsDataOutputStream currentPartStream;
 
-       private long lastUpdateTime;
+       /**
+        * @return The state of the pending part file. {@link Bucket} uses this 
to commit the pending file.
+        * @throws IOException Thrown if an I/O error occurs.
+        */
+       PendingFileRecoverable closeForCommit() throws IOException;
 
-       protected PartFileWriter(
-                       final BucketID bucketId,
-                       final RecoverableFsDataOutputStream currentPartStream,
-                       final long creationTime) {
+       /**
+        * Dispose the part file.
+        */
+       void dispose();
 
-               Preconditions.checkArgument(creationTime >= 0L);
-               this.bucketId = Preconditions.checkNotNull(bucketId);
-               this.currentPartStream = 
Preconditions.checkNotNull(currentPartStream);
-               this.creationTime = creationTime;
-               this.lastUpdateTime = creationTime;
-       }
+       // 
------------------------------------------------------------------------
 
-       abstract void write(IN element, long currentTime) throws IOException;
+       /**
+        * An interface for factories that create the different {@link 
PartFileWriter writers}.
+        */
+       interface PartFileFactory<IN, BucketID> {
 
-       RecoverableWriter.ResumeRecoverable persist() throws IOException {
-               return currentPartStream.persist();
-       }
+               /**
+                * Used to create a new {@link PartFileWriter}.
+                * @param bucketID the id of the bucket this writer is writing 
to.
+                * @param path the path this writer will write to.
+                * @param creationTime the creation time of the file.
+                * @return the new {@link PartFileWriter}
+                * @throws IOException Thrown if creating a writer fails.
+                */
+               PartFileWriter<IN, BucketID> openNew(
+                       final BucketID bucketID,
+                       final Path path,
+                       final long creationTime) throws IOException;
 
-       RecoverableWriter.CommitRecoverable closeForCommit() throws IOException 
{
-               return currentPartStream.closeForCommit().getRecoverable();
-       }
+               /**
+                * Used to resume a {@link PartFileWriter} from a {@link 
InProgressFileRecoverable}.
+                * @param bucketID the id of the bucket this writer is writing 
to.
+                * @param inProgressFileSnapshot the state of the part file.
+                * @param creationTime the creation time of the file.
+                * @return the resumed {@link PartFileWriter}
+                * @throws IOException Thrown if resuming a writer fails.
+                */
+               PartFileWriter<IN, BucketID> resumeFrom(
+                       final BucketID bucketID,
+                       final InProgressFileRecoverable inProgressFileSnapshot,
+                       final long creationTime) throws IOException;
 
-       void dispose() {
-               // we can suppress exceptions here, because we do not rely on 
close() to
-               // flush or persist any data
-               IOUtils.closeQuietly(currentPartStream);
-       }
+               /**
+                * Recovers a pending file for finalizing and committing.
+                * @param pendingFileRecoverable The handle with the recovery 
information.
+                * @return A pending file
+                * @throws IOException Thrown if recovering a pending file 
fails.
+                */
+               PendingFile recoverPendingFile(final PendingFileRecoverable 
pendingFileRecoverable) throws IOException;
 
-       void markWrite(long now) {
-               this.lastUpdateTime = now;
-       }
+               /**
+                * Marks if requiring to do any additional cleanup/freeing of 
resources occupied
+                * as part of a {@link InProgressFileRecoverable}.
+                *
+                * <p>In case cleanup is required, then {@link 
#cleanupInProgressFileRecoverable(InProgressFileRecoverable)} should
+                * be called.
+                *
+                * @return {@code true} if cleanup is required, {@code false} 
otherwise.
+                */
+               boolean requiresCleanupOfInProgressFileRecoverableState();
 
-       @Override
-       public BucketID getBucketId() {
-               return bucketId;
-       }
+               /**
+                * Frees up any resources that were previously occupied in 
order to be able to
+                * recover from a (potential) failure.
+                *
+                * <p><b>NOTE:</b> This operation should not throw an exception 
if the {@link InProgressFileRecoverable} has already
+                * been cleaned up and the resources have been freed. But the 
contract is that it will throw
+                * an {@link UnsupportedOperationException} if it is called for 
a {@link PartFileFactory}
+                * whose {@link 
#requiresCleanupOfInProgressFileRecoverableState()} returns {@code false}.
+                *
+                * @param inProgressFileRecoverable the {@link 
InProgressFileRecoverable} whose state we want to clean-up.
+                * @return {@code true} if the resources were successfully 
freed, {@code false} otherwise
+                * (e.g. the file to be deleted was not there for any reason - 
already deleted or never created).
+                * @throws IOException if an I/O error occurs
+                */
+               boolean cleanupInProgressFileRecoverable(final 
InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
 
-       @Override
-       public long getCreationTime() {
-               return creationTime;
-       }
 
-       @Override
-       public long getSize() throws IOException {
-               return currentPartStream.getPos();
-       }
+               /**
+                * @return the serializer for the {@link 
PendingFileRecoverable}.
+                */
+               SimpleVersionedSerializer<? extends PendingFileRecoverable> 
getPendingFileRecoverableSerializer();
 
-       @Override
-       public long getLastUpdateTime() {
-               return lastUpdateTime;
+               /**
+                * @return the serializer for the {@link 
InProgressFileRecoverable}.
+                */
+               SimpleVersionedSerializer<? extends InProgressFileRecoverable> 
getInProgressFileRecoverableSerializer();
+
+               /**
+                * Checks whether the {@link PartFileWriter} supports resuming 
(appending to) files after
+                * recovery (via the {@link #resumeFrom(Object, 
InProgressFileRecoverable, long)} method).
+                *
+                * <p>If true, then this writer supports the {@link 
#resumeFrom(Object, InProgressFileRecoverable, long)} method.
+                * If false, then that method may not be supported and file can 
only be recovered via
+                * {@link #recoverPendingFile(PendingFileRecoverable)}.
+                */
+               boolean supportsResume();
        }
 
-       // 
------------------------------------------------------------------------
+        /**
+        * A handle can be used to recover in-progress file..
+        */
+       interface InProgressFileRecoverable extends PendingFileRecoverable {}

Review comment:
       pending: closed flie or opened file. But not commit.
   in progress: opened file, still need append records.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to