This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 824752c82729b5fd6aab5c6f205476ae63c8aff5 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Tue Jan 25 18:31:17 2022 +0800 [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter. --- .../connector/file/sink/FileSinkCommittable.java | 74 +++++++++++++++++++++- .../file/sink/FileSinkCommittableSerializer.java | 46 ++++++++++++-- .../file/sink/committer/FileCommitter.java | 21 ++++++ .../file/sink/writer/FileWriterBucket.java | 5 +- .../file/sink/FileCommittableSerializerTest.java | 25 +++++++- .../file/sink/committer/FileCommitterTest.java | 14 ++-- 6 files changed, 167 insertions(+), 18 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java index 2c5e8e5..7ea5b1d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java @@ -19,11 +19,13 @@ package org.apache.flink.connector.file.sink; import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -34,26 +36,51 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class FileSinkCommittable implements Serializable { + private final String bucketId; + @Nullable private final InProgressFileWriter.PendingFileRecoverable pendingFile; @Nullable private final InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup; - public FileSinkCommittable(InProgressFileWriter.PendingFileRecoverable pendingFile) { + @Nullable private final Path compactedFileToCleanup; + + public FileSinkCommittable( + String bucketId, InProgressFileWriter.PendingFileRecoverable pendingFile) { + this.bucketId = bucketId; this.pendingFile = checkNotNull(pendingFile); this.inProgressFileToCleanup = null; + this.compactedFileToCleanup = null; } public FileSinkCommittable( + String bucketId, InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) { + this.bucketId = bucketId; this.pendingFile = null; this.inProgressFileToCleanup = checkNotNull(inProgressFileToCleanup); + this.compactedFileToCleanup = null; + } + + public FileSinkCommittable(String bucketId, Path compactedFileToCleanup) { + this.bucketId = bucketId; + this.pendingFile = null; + this.inProgressFileToCleanup = null; + this.compactedFileToCleanup = checkNotNull(compactedFileToCleanup); } FileSinkCommittable( + String bucketId, @Nullable InProgressFileWriter.PendingFileRecoverable pendingFile, - @Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) { + @Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup, + @Nullable Path compactedFileToCleanup) { + this.bucketId = bucketId; this.pendingFile = pendingFile; this.inProgressFileToCleanup = inProgressFileToCleanup; + this.compactedFileToCleanup = compactedFileToCleanup; + } + + public String getBucketId() { + return bucketId; } public boolean hasPendingFile() { @@ -73,4 +100,47 @@ public class FileSinkCommittable implements Serializable { public InProgressFileWriter.InProgressFileRecoverable getInProgressFileToCleanup() { return inProgressFileToCleanup; } + + public boolean hasCompactedFileToCleanup() { + return compactedFileToCleanup != null; + } + + @Nullable + public Path getCompactedFileToCleanup() { + return compactedFileToCleanup; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FileSinkCommittable that = (FileSinkCommittable) o; + return Objects.equals(bucketId, that.bucketId) + && Objects.equals(pendingFile, that.pendingFile) + && Objects.equals(inProgressFileToCleanup, that.inProgressFileToCleanup) + && Objects.equals(compactedFileToCleanup, that.compactedFileToCleanup); + } + + @Override + public int hashCode() { + return Objects.hash(bucketId, pendingFile, inProgressFileToCleanup, compactedFileToCleanup); + } + + @Override + public String toString() { + return "FileSinkCommittable{" + + "bucketId='" + + bucketId + + ", pendingFile=" + + pendingFile + + ", inProgressFileToCleanup=" + + inProgressFileToCleanup + + ", compactedFileToCleanup=" + + compactedFileToCleanup + + '}'; + } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java index 99ef42f..febc9a5 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.file.sink; import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; @@ -55,14 +56,14 @@ public class FileSinkCommittableSerializer @Override public int getVersion() { - return 1; + return 2; } @Override public byte[] serialize(FileSinkCommittable committable) throws IOException { DataOutputSerializer out = new DataOutputSerializer(256); out.writeInt(MAGIC_NUMBER); - serializeV1(committable, out); + serializeV2(committable, out); return out.getCopyOfBuffer(); } @@ -74,14 +75,17 @@ public class FileSinkCommittableSerializer case 1: validateMagicNumber(in); return deserializeV1(in); + case 2: + validateMagicNumber(in); + return deserializeV2(in); default: throw new IOException("Unrecognized version or corrupt state: " + version); } } - private void serializeV1(FileSinkCommittable committable, DataOutputView dataOutputView) + private void serializeV2(FileSinkCommittable committable, DataOutputView dataOutputView) throws IOException { - + dataOutputView.writeUTF(committable.getBucketId()); if (committable.hasPendingFile()) { dataOutputView.writeBoolean(true); SimpleVersionedSerialization.writeVersionAndSerialize( @@ -99,6 +103,13 @@ public class FileSinkCommittableSerializer } else { dataOutputView.writeBoolean(false); } + + if (committable.hasCompactedFileToCleanup()) { + dataOutputView.writeBoolean(true); + dataOutputView.writeUTF(committable.getCompactedFileToCleanup().toUri().toString()); + } else { + dataOutputView.writeBoolean(false); + } } private FileSinkCommittable deserializeV1(DataInputView dataInputView) throws IOException { @@ -116,7 +127,32 @@ public class FileSinkCommittableSerializer inProgressFileSerializer, dataInputView); } - return new FileSinkCommittable(pendingFile, inProgressFileToCleanup); + return new FileSinkCommittable("", pendingFile, inProgressFileToCleanup, null); + } + + private FileSinkCommittable deserializeV2(DataInputView dataInputView) throws IOException { + String bucketId = dataInputView.readUTF(); + InProgressFileWriter.PendingFileRecoverable pendingFile = null; + if (dataInputView.readBoolean()) { + pendingFile = + SimpleVersionedSerialization.readVersionAndDeSerialize( + pendingFileSerializer, dataInputView); + } + + InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup = null; + if (dataInputView.readBoolean()) { + inProgressFileToCleanup = + SimpleVersionedSerialization.readVersionAndDeSerialize( + inProgressFileSerializer, dataInputView); + } + + Path committedFileToCleanup = null; + if (dataInputView.readBoolean()) { + committedFileToCleanup = new Path(dataInputView.readUTF()); + } + + return new FileSinkCommittable( + bucketId, pendingFile, inProgressFileToCleanup, committedFileToCleanup); } private static void validateMagicNumber(DataInputView in) throws IOException { diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java index c72b399..7590178 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java @@ -22,8 +22,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.Collection; @@ -40,6 +44,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class FileCommitter implements Committer<FileSinkCommittable> { + private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class); + private final BucketWriter<?, ?> bucketWriter; public FileCommitter(BucketWriter<?, ?> bucketWriter) { @@ -60,6 +66,21 @@ public class FileCommitter implements Committer<FileSinkCommittable> { bucketWriter.cleanupInProgressFileRecoverable( committable.getInProgressFileToCleanup()); } + + if (committable.hasCompactedFileToCleanup()) { + Path committedFileToCleanup = committable.getCompactedFileToCleanup(); + try { + committedFileToCleanup.getFileSystem().delete(committedFileToCleanup, false); + } catch (Exception e) { + // Try best to cleanup compacting files, skip if failed. + if (LOG.isDebugEnabled()) { + LOG.debug( + "Failed to cleanup a compacted file, the file will be remained and should not be visible: {}", + committedFileToCleanup, + e); + } + } + } } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java index 3f5f5f9..8385fe9 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java @@ -202,11 +202,12 @@ class FileWriterBucket<IN> { } List<FileSinkCommittable> committables = new ArrayList<>(); - pendingFiles.forEach(pendingFile -> committables.add(new FileSinkCommittable(pendingFile))); + pendingFiles.forEach( + pendingFile -> committables.add(new FileSinkCommittable(bucketId, pendingFile))); pendingFiles.clear(); if (inProgressFileToCleanup != null) { - committables.add(new FileSinkCommittable(inProgressFileToCleanup)); + committables.add(new FileSinkCommittable(bucketId, inProgressFileToCleanup)); inProgressFileToCleanup = null; } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java index 523fde0..ffda5dd 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.file.sink; import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils; +import org.apache.flink.core.fs.Path; import org.junit.ClassRule; import org.junit.Test; @@ -36,23 +37,43 @@ public class FileCommittableSerializerTest { @Test public void testCommittableWithPendingFile() throws IOException { FileSinkCommittable committable = - new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()); + new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable()); FileSinkCommittable deserialized = serializeAndDeserialize(committable); + assertEquals(committable.getBucketId(), deserialized.getBucketId()); assertEquals(committable.getPendingFile(), deserialized.getPendingFile()); assertEquals( committable.getInProgressFileToCleanup(), deserialized.getInProgressFileToCleanup()); + assertEquals( + committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup()); } @Test public void testCommittableWithInProgressFileToCleanup() throws IOException { FileSinkCommittable committable = - new FileSinkCommittable(new FileSinkTestUtils.TestInProgressFileRecoverable()); + new FileSinkCommittable("0", new FileSinkTestUtils.TestInProgressFileRecoverable()); + FileSinkCommittable deserialized = serializeAndDeserialize(committable); + assertEquals(committable.getBucketId(), deserialized.getBucketId()); + assertEquals(committable.getPendingFile(), deserialized.getPendingFile()); + assertEquals( + committable.getInProgressFileToCleanup(), + deserialized.getInProgressFileToCleanup()); + assertEquals( + committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup()); + } + + @Test + public void testCommittableWithCompactedFileToCleanup() throws IOException { + FileSinkCommittable committable = + new FileSinkCommittable("0", new Path("/tmp/mock_path_to_cleanup")); FileSinkCommittable deserialized = serializeAndDeserialize(committable); + assertEquals(committable.getBucketId(), deserialized.getBucketId()); assertEquals(committable.getPendingFile(), deserialized.getPendingFile()); assertEquals( committable.getInProgressFileToCleanup(), deserialized.getInProgressFileToCleanup()); + assertEquals( + committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup()); } private FileSinkCommittable serializeAndDeserialize(FileSinkCommittable committable) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java index 0b07370..5b7e21b 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java @@ -50,7 +50,7 @@ public class FileCommitterTest { MockCommitRequest<FileSinkCommittable> fileSinkCommittable = new MockCommitRequest<>( new FileSinkCommittable( - new FileSinkTestUtils.TestPendingFileRecoverable())); + "0", new FileSinkTestUtils.TestPendingFileRecoverable())); fileCommitter.commit(Collections.singletonList(fileSinkCommittable)); assertEquals(1, stubBucketWriter.getRecoveredPendingFiles().size()); @@ -67,7 +67,7 @@ public class FileCommitterTest { MockCommitRequest<FileSinkCommittable> fileSinkCommittable = new MockCommitRequest<>( new FileSinkCommittable( - new FileSinkTestUtils.TestInProgressFileRecoverable())); + "0", new FileSinkTestUtils.TestInProgressFileRecoverable())); fileCommitter.commit(Collections.singletonList(fileSinkCommittable)); assertEquals(0, stubBucketWriter.getRecoveredPendingFiles().size()); @@ -83,15 +83,15 @@ public class FileCommitterTest { Collection<CommitRequest<FileSinkCommittable>> committables = Stream.of( new FileSinkCommittable( - new FileSinkTestUtils.TestPendingFileRecoverable()), + "0", new FileSinkTestUtils.TestPendingFileRecoverable()), new FileSinkCommittable( - new FileSinkTestUtils.TestPendingFileRecoverable()), + "0", new FileSinkTestUtils.TestPendingFileRecoverable()), new FileSinkCommittable( - new FileSinkTestUtils.TestInProgressFileRecoverable()), + "0", new FileSinkTestUtils.TestInProgressFileRecoverable()), new FileSinkCommittable( - new FileSinkTestUtils.TestPendingFileRecoverable()), + "0", new FileSinkTestUtils.TestPendingFileRecoverable()), new FileSinkCommittable( - new FileSinkTestUtils.TestInProgressFileRecoverable())) + "0", new FileSinkTestUtils.TestInProgressFileRecoverable())) .map(MockCommitRequest::new) .collect(Collectors.toList()); fileCommitter.commit(committables);