JingsongLi commented on a change in pull request #12168: URL: https://github.com/apache/flink/pull/12168#discussion_r426233235
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java ########## @@ -20,122 +20,172 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.fs.RecoverableFsDataOutputStream; -import org.apache.flink.core.fs.RecoverableWriter; -import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; +import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; /** - * An abstract writer for the currently open part file in a specific {@link Bucket}. - * - * <p>Currently, there are two subclasses, of this class: - * <ol> - * <li>One for row-wise formats: the {@link RowWisePartWriter}.</li> - * <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li> - * </ol> - * - * <p>This also implements the {@link PartFileInfo}. + * The {@link Bucket} uses the {@link PartFileWriter} to write element to a part file. */ @Internal -abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> { +interface PartFileWriter<IN, BucketID> extends PartFileInfo<BucketID> { - private final BucketID bucketId; + /** + * Write a element to the part file. + * @param element the element to be written. + * @param currentTime the writing time. + * @throws IOException Thrown if writing the element fails. + */ + void write(final IN element, final long currentTime) throws IOException; - private final long creationTime; + /** + * @return The state of the current part file. + * @throws IOException Thrown if persisting the part file fails. + */ + InProgressFileRecoverable persist() throws IOException; - protected final RecoverableFsDataOutputStream currentPartStream; - private long lastUpdateTime; + /** + * @return The state of the pending part file. {@link Bucket} uses this to commit the pending file. + * @throws IOException Thrown if an I/O error occurs. + */ + PendingFileRecoverable closeForCommit() throws IOException; - protected PartFileWriter( - final BucketID bucketId, - final RecoverableFsDataOutputStream currentPartStream, - final long creationTime) { + /** + * Dispose the part file. + */ + void dispose(); - Preconditions.checkArgument(creationTime >= 0L); - this.bucketId = Preconditions.checkNotNull(bucketId); - this.currentPartStream = Preconditions.checkNotNull(currentPartStream); - this.creationTime = creationTime; - this.lastUpdateTime = creationTime; - } + // ------------------------------------------------------------------------ - abstract void write(IN element, long currentTime) throws IOException; + /** + * An interface for factories that create the different {@link PartFileWriter writers}. + */ + interface PartFileFactory<IN, BucketID> { - RecoverableWriter.ResumeRecoverable persist() throws IOException { - return currentPartStream.persist(); - } + /** + * Used to create a new {@link PartFileWriter}. + * @param bucketID the id of the bucket this writer is writing to. + * @param path the path this writer will write to. + * @param creationTime the creation time of the file. + * @return the new {@link PartFileWriter} + * @throws IOException Thrown if creating a writer fails. + */ + PartFileWriter<IN, BucketID> openNew( + final BucketID bucketID, + final Path path, + final long creationTime) throws IOException; - RecoverableWriter.CommitRecoverable closeForCommit() throws IOException { - return currentPartStream.closeForCommit().getRecoverable(); - } + /** + * Used to resume a {@link PartFileWriter} from a {@link InProgressFileRecoverable}. + * @param bucketID the id of the bucket this writer is writing to. + * @param inProgressFileSnapshot the state of the part file. + * @param creationTime the creation time of the file. + * @return the resumed {@link PartFileWriter} + * @throws IOException Thrown if resuming a writer fails. + */ + PartFileWriter<IN, BucketID> resumeFrom( + final BucketID bucketID, + final InProgressFileRecoverable inProgressFileSnapshot, + final long creationTime) throws IOException; - void dispose() { - // we can suppress exceptions here, because we do not rely on close() to - // flush or persist any data - IOUtils.closeQuietly(currentPartStream); - } + /** + * Recovers a pending file for finalizing and committing. + * @param pendingFileRecoverable The handle with the recovery information. + * @return A pending file + * @throws IOException Thrown if recovering a pending file fails. + */ + PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException; - void markWrite(long now) { - this.lastUpdateTime = now; - } + /** + * Marks if requiring to do any additional cleanup/freeing of resources occupied + * as part of a {@link InProgressFileRecoverable}. + * + * <p>In case cleanup is required, then {@link #cleanupInProgressFileRecoverable(InProgressFileRecoverable)} should + * be called. + * + * @return {@code true} if cleanup is required, {@code false} otherwise. + */ + boolean requiresCleanupOfInProgressFileRecoverableState(); - @Override - public BucketID getBucketId() { - return bucketId; - } + /** + * Frees up any resources that were previously occupied in order to be able to + * recover from a (potential) failure. + * + * <p><b>NOTE:</b> This operation should not throw an exception if the {@link InProgressFileRecoverable} has already + * been cleaned up and the resources have been freed. But the contract is that it will throw + * an {@link UnsupportedOperationException} if it is called for a {@link PartFileFactory} + * whose {@link #requiresCleanupOfInProgressFileRecoverableState()} returns {@code false}. + * + * @param inProgressFileRecoverable the {@link InProgressFileRecoverable} whose state we want to clean-up. + * @return {@code true} if the resources were successfully freed, {@code false} otherwise + * (e.g. the file to be deleted was not there for any reason - already deleted or never created). + * @throws IOException if an I/O error occurs + */ + boolean cleanupInProgressFileRecoverable(final InProgressFileRecoverable inProgressFileRecoverable) throws IOException; - @Override - public long getCreationTime() { - return creationTime; - } - @Override - public long getSize() throws IOException { - return currentPartStream.getPos(); - } + /** + * @return the serializer for the {@link PendingFileRecoverable}. + */ + SimpleVersionedSerializer<? extends PendingFileRecoverable> getPendingFileRecoverableSerializer(); - @Override - public long getLastUpdateTime() { - return lastUpdateTime; + /** + * @return the serializer for the {@link InProgressFileRecoverable}. + */ + SimpleVersionedSerializer<? extends InProgressFileRecoverable> getInProgressFileRecoverableSerializer(); + + /** + * Checks whether the {@link PartFileWriter} supports resuming (appending to) files after + * recovery (via the {@link #resumeFrom(Object, InProgressFileRecoverable, long)} method). + * + * <p>If true, then this writer supports the {@link #resumeFrom(Object, InProgressFileRecoverable, long)} method. + * If false, then that method may not be supported and file can only be recovered via + * {@link #recoverPendingFile(PendingFileRecoverable)}. + */ + boolean supportsResume(); } - // ------------------------------------------------------------------------ + /** + * A handle can be used to recover in-progress file.. + */ + interface InProgressFileRecoverable extends PendingFileRecoverable {} Review comment: pending: closed flie or opened file. But not commit. in progress: opened file, still need append records. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org