This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dc1db12137e [FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729 dc1db12137e is described below commit dc1db12137ecad921ae90969d7bfbf1ee7d3d2ef Author: Cheena Budhiraja <110803195+bche...@users.noreply.github.com> AuthorDate: Fri Dec 1 18:52:43 2023 +0530 [FLINK-32878][Filesystems] Add entropy in temporary object name in GCS connector. This closes #23729 --- docs/content.zh/docs/deployment/filesystems/gcs.md | 9 ++--- docs/content/docs/deployment/filesystems/gcs.md | 9 ++--- .../apache/flink/fs/gs/GSFileSystemOptions.java | 17 ++++++++++ .../org/apache/flink/fs/gs/utils/BlobUtils.java | 21 +++++++++++- .../flink/fs/gs/writer/GSCommitRecoverable.java | 7 ++-- .../fs/gs/writer/GSRecoverableWriterCommitter.java | 29 +++++++++++++---- .../apache/flink/fs/gs/utils/BlobUtilsTest.java | 15 +++++++++ .../fs/gs/writer/GSCommitRecoverableTest.java | 38 ++++++++++++++++++++++ 8 files changed, 127 insertions(+), 18 deletions(-) diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md b/docs/content.zh/docs/deployment/filesystems/gcs.md index 7edf78b2e61..f80e5b3af4a 100644 --- a/docs/content.zh/docs/deployment/filesystems/gcs.md +++ b/docs/content.zh/docs/deployment/filesystems/gcs.md @@ -76,10 +76,11 @@ You can also set `gcs-connector` options directly in the Hadoop `core-site.xml` `flink-gs-fs-hadoop` can also be configured by setting the following options in `flink-conf.yaml`: -| Key | Description [...] -|-------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] -| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix `.inprogress/`. <br><br> It is recommended to choose a separate bucket in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mec [...] -| gs.writer.chunk.size | Set this property to [set the chunk size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_) for writes via `RecoverableWriter`. <br><br>If not set, a Google-determined default chunk size will be used. [...] +| Key | Description [...] +|---------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix `.inprogress/`. <br><br> It is recommended to choose a separate bucket in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mechanism to [...] +| gs.writer.chunk.size | Set this property to [set the chunk size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_) for writes via `RecoverableWriter`. <br><br>If not set, a Google-determined default chunk size will be used. [...] +| gs.filesink.entropy.enabled | Set this property to improve performance due to hotspotting issues on GCS. This option defines whether to enable entropy injection in filesink gcs path. If this is enabled, entropy in the form of temporary object id will be injected in beginning of the gcs path of the temporary objects. The final object path remains unchanged. [...] ### Authentication to access GCS diff --git a/docs/content/docs/deployment/filesystems/gcs.md b/docs/content/docs/deployment/filesystems/gcs.md index 3bd130e2046..bd097b3a4c4 100644 --- a/docs/content/docs/deployment/filesystems/gcs.md +++ b/docs/content/docs/deployment/filesystems/gcs.md @@ -76,10 +76,11 @@ You can also set `gcs-connector` options directly in the Hadoop `core-site.xml` `flink-gs-fs-hadoop` can also be configured by setting the following options in `flink-conf.yaml`: -| Key | Description [...] -|-------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] -| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix `.inprogress/`. <br><br> It is recommended to choose a separate bucket in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mec [...] -| gs.writer.chunk.size | Set this property to [set the chunk size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_) for writes via `RecoverableWriter`. <br><br>If not set, a Google-determined default chunk size will be used. [...] +| Key | Description [...] +|---------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| gs.writer.temporary.bucket.name | Set this property to choose a bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. If this property is not set, temporary blobs will be written to same bucket as the final file being written. In either case, temporary blobs are written with the prefix `.inprogress/`. <br><br> It is recommended to choose a separate bucket in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mechanism to [...] +| gs.writer.chunk.size | Set this property to [set the chunk size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_) for writes via `RecoverableWriter`. <br><br>If not set, a Google-determined default chunk size will be used. [...] +| gs.filesink.entropy.enabled | Set this property to improve performance due to hotspotting issues on GCS. This option defines whether to enable entropy injection in filesink gcs path. If this is enabled, entropy in the form of temporary object id will be injected in beginning of the gcs path of the temporary objects. The final object path remains unchanged. [...] ### Authentication to access GCS diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java index d2b53414a4d..71c0d7593e9 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java @@ -47,6 +47,18 @@ public class GSFileSystemOptions { "This option sets the chunk size for writes to the underlying Google storage. If set, this must be a multiple " + "of 256KB. If not set, writes will use Google's default chunk size."); + /* Flink config option to determine if entropy should be enabled in filesink gcs path. */ + public static final ConfigOption<Boolean> ENABLE_FILESINK_ENTROPY = + ConfigOptions.key("gs.filesink.entropy.enabled") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "This option can be used to improve performance due to hotspotting " + + "issues on GCS. If this is enabled, entropy in the form of " + + "temporary object id will be injected in the beginning of " + + "gcs path of the temporary objects. The final object path " + + "remains unchanged."); + /** The Flink configuration. */ private final Configuration flinkConfig; @@ -80,6 +92,11 @@ public class GSFileSystemOptions { return flinkConfig.getOptional(WRITER_CHUNK_SIZE); } + /** Whether entropy insertion is enabled in filesink path. */ + public Boolean isFileSinkEntropyEnabled() { + return flinkConfig.get(ENABLE_FILESINK_ENTROPY); + } + @Override public String toString() { return "GSFileSystemOptions{" diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java index 8442bc7c68d..48af31bedec 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java @@ -107,6 +107,22 @@ public class BlobUtils { return getTemporaryObjectPartialName(finalBlobIdentifier) + temporaryObjectId.toString(); } + /** + * Returns a temporary object name with entropy, formed by adding the temporary object id to the + * temporary object partial name in both start and end of path, i.e. abc.inprogress/foo/bar/abc + * for the final blob with object name "foo/bar" and temporary object id "abc". + * + * @param finalBlobIdentifier The final blob identifier + * @param temporaryObjectId The temporary object id + * @return The temporary object name with entropy + */ + public static String getTemporaryObjectNameWithEntropy( + GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId) { + return temporaryObjectId.toString() + + getTemporaryObjectPartialName(finalBlobIdentifier) + + temporaryObjectId.toString(); + } + /** * Resolves a temporary blob identifier for a provided temporary object id and the provided * options. @@ -122,7 +138,10 @@ public class BlobUtils { GSFileSystemOptions options) { String temporaryBucketName = BlobUtils.getTemporaryBucketName(finalBlobIdentifier, options); String temporaryObjectName = - BlobUtils.getTemporaryObjectName(finalBlobIdentifier, temporaryObjectId); + options.isFileSinkEntropyEnabled() + ? BlobUtils.getTemporaryObjectNameWithEntropy( + finalBlobIdentifier, temporaryObjectId) + : BlobUtils.getTemporaryObjectName(finalBlobIdentifier, temporaryObjectId); return new GSBlobIdentifier(temporaryBucketName, temporaryObjectName); } } diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSCommitRecoverable.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSCommitRecoverable.java index edd9566d3b9..e9f02105e0f 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSCommitRecoverable.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSCommitRecoverable.java @@ -73,8 +73,11 @@ class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable { componentObjectIds.stream() .map( temporaryObjectId -> - BlobUtils.getTemporaryObjectName( - finalBlobIdentifier, temporaryObjectId)) + options.isFileSinkEntropyEnabled() + ? BlobUtils.getTemporaryObjectNameWithEntropy( + finalBlobIdentifier, temporaryObjectId) + : BlobUtils.getTemporaryObjectName( + finalBlobIdentifier, temporaryObjectId)) .map( temporaryObjectName -> new GSBlobIdentifier( diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java index e67ff38cb61..9258aeaf9d1 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -53,6 +54,8 @@ class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Comm /** The max number of blobs to compose in a single operation. */ private final int composeMaxBlobs; + private List<GSBlobIdentifier> composedTempBlobIdentifiers = new ArrayList<>(); + GSRecoverableWriterCommitter( GSBlobStorage storage, GSFileSystemOptions options, @@ -197,6 +200,7 @@ class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Comm BlobUtils.getTemporaryBlobIdentifier( recoverable.finalBlobIdentifier, temporaryObjectId, options); composeBlobs(recoverable.getComponentBlobIds(options), intermediateBlobIdentifier); + composedTempBlobIdentifiers.add(intermediateBlobIdentifier); storage.copy(intermediateBlobIdentifier, recoverable.finalBlobIdentifier); } } @@ -212,16 +216,27 @@ class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Comm options, recoverable); - // determine the partial name for the temporary objects to be deleted + List<GSBlobIdentifier> foundTempBlobIdentifiers = new ArrayList<>(); String temporaryBucketName = BlobUtils.getTemporaryBucketName(recoverable.finalBlobIdentifier, options); - String temporaryObjectPartialName = - BlobUtils.getTemporaryObjectPartialName(recoverable.finalBlobIdentifier); - // find all the temp blobs by looking for anything that starts with the temporary - // object partial name. doing it this way finds any orphaned temp blobs as well - List<GSBlobIdentifier> foundTempBlobIdentifiers = - storage.list(temporaryBucketName, temporaryObjectPartialName); + if (options.isFileSinkEntropyEnabled()) { + // if filesink entropy is enabled, we cannot find temp blobs with a fixed prefix. + // We should have a predefined list of temp blobs to delete. + if (!recoverable.finalBlobIdentifier.bucketName.equals(temporaryBucketName)) { + foundTempBlobIdentifiers.addAll(composedTempBlobIdentifiers); + } + foundTempBlobIdentifiers.addAll(recoverable.getComponentBlobIds(options)); + } else { + // determine the partial name for the temporary objects to be deleted + String temporaryObjectPartialName = + BlobUtils.getTemporaryObjectPartialName(recoverable.finalBlobIdentifier); + + // find all the temp blobs by looking for anything that starts with the temporary + // object partial name. doing it this way finds any orphaned temp blobs as well + foundTempBlobIdentifiers = + storage.list(temporaryBucketName, temporaryObjectPartialName); + } if (!foundTempBlobIdentifiers.isEmpty()) { // delete all the temp blobs, and populate the set with ones that were actually deleted diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java index 2150b0d9fdf..e53b64e8fd7 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java @@ -92,6 +92,21 @@ public class BlobUtilsTest { assertEquals(".inprogress/foo/bar/f09c43e5-ea49-4537-a406-0586f8f09d47", partialName); } + @Test + public void shouldProperlyConstructTemporaryObjectNameWithEntropy() { + Configuration flinkConfig = new Configuration(); + flinkConfig.set(GSFileSystemOptions.ENABLE_FILESINK_ENTROPY, Boolean.TRUE); + + GSBlobIdentifier identifier = new GSBlobIdentifier("foo", "bar"); + UUID temporaryObjectId = UUID.fromString("f09c43e5-ea49-4537-a406-0586f8f09d47"); + + String partialName = + BlobUtils.getTemporaryObjectNameWithEntropy(identifier, temporaryObjectId); + assertEquals( + "f09c43e5-ea49-4537-a406-0586f8f09d47.inprogress/foo/bar/f09c43e5-ea49-4537-a406-0586f8f09d47", + partialName); + } + @Test public void shouldProperlyConstructTemporaryBlobIdentifierWithDefaultBucket() { Configuration flinkConfig = new Configuration(); diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java index dd62a503946..eb1f82eeac7 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java @@ -133,4 +133,42 @@ public class GSCommitRecoverableTest { assertEquals(expectedObjectName, componentBlobIdentifier.objectName); } } + + @Test + public void shouldGetComponentBlobIdsWithEntropy() { + + // configure options, if this test configuration has a temporary bucket name, set it + Configuration flinkConfig = new Configuration(); + if (temporaryBucketName != null) { + flinkConfig.set(GSFileSystemOptions.WRITER_TEMPORARY_BUCKET_NAME, temporaryBucketName); + } + // enable filesink entropy + flinkConfig.set(GSFileSystemOptions.ENABLE_FILESINK_ENTROPY, Boolean.TRUE); + GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig); + GSCommitRecoverable commitRecoverable = + new GSCommitRecoverable(blobIdentifier, componentObjectIds); + List<GSBlobIdentifier> componentBlobIdentifiers = + commitRecoverable.getComponentBlobIds(options); + + for (int i = 0; i < componentObjectIds.size(); i++) { + UUID componentObjectId = componentObjectIds.get(i); + GSBlobIdentifier componentBlobIdentifier = componentBlobIdentifiers.get(i); + + // if a temporary bucket is specified in options, the component blob identifier + // should be in this bucket; otherwise, it should be in the bucket with the final blob + assertEquals( + temporaryBucketName == null ? blobIdentifier.bucketName : temporaryBucketName, + componentBlobIdentifier.bucketName); + + // make sure the name is what is expected + String expectedObjectName = + String.format( + "%s.inprogress/%s/%s/%s", + componentObjectId, + blobIdentifier.bucketName, + blobIdentifier.objectName, + componentObjectId); + assertEquals(expectedObjectName, componentBlobIdentifier.objectName); + } + } }