This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dc1db12137e [FLINK-32878][Filesystems] Add entropy in temporary object 
name in GCS connector. This closes #23729
dc1db12137e is described below

commit dc1db12137ecad921ae90969d7bfbf1ee7d3d2ef
Author: Cheena Budhiraja <110803195+bche...@users.noreply.github.com>
AuthorDate: Fri Dec 1 18:52:43 2023 +0530

    [FLINK-32878][Filesystems] Add entropy in temporary object name in GCS 
connector. This closes #23729
---
 docs/content.zh/docs/deployment/filesystems/gcs.md |  9 ++---
 docs/content/docs/deployment/filesystems/gcs.md    |  9 ++---
 .../apache/flink/fs/gs/GSFileSystemOptions.java    | 17 ++++++++++
 .../org/apache/flink/fs/gs/utils/BlobUtils.java    | 21 +++++++++++-
 .../flink/fs/gs/writer/GSCommitRecoverable.java    |  7 ++--
 .../fs/gs/writer/GSRecoverableWriterCommitter.java | 29 +++++++++++++----
 .../apache/flink/fs/gs/utils/BlobUtilsTest.java    | 15 +++++++++
 .../fs/gs/writer/GSCommitRecoverableTest.java      | 38 ++++++++++++++++++++++
 8 files changed, 127 insertions(+), 18 deletions(-)

diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md 
b/docs/content.zh/docs/deployment/filesystems/gcs.md
index 7edf78b2e61..f80e5b3af4a 100644
--- a/docs/content.zh/docs/deployment/filesystems/gcs.md
+++ b/docs/content.zh/docs/deployment/filesystems/gcs.md
@@ -76,10 +76,11 @@ You can also set `gcs-connector` options directly in the 
Hadoop `core-site.xml`
 
 `flink-gs-fs-hadoop` can also be configured by setting the following options 
in `flink-conf.yaml`:
 
-| Key                                       | Description                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|-------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| gs.writer.temporary.bucket.name           | Set this property to choose a 
bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. 
If this property is not set, temporary blobs will be written to same bucket as 
the final file being written. In either case, temporary blobs are written with 
the prefix `.inprogress/`. <br><br>  It is recommended to choose a separate 
bucket in order to [assign it a 
TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mec [...]
-| gs.writer.chunk.size                      | Set this property to [set the 
chunk 
size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_)
 for writes via `RecoverableWriter`. <br><br>If not set, a Google-determined 
default chunk size will be used.                                                
                                                                                
                    [...]
+| Key                             | Description                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|---------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| gs.writer.temporary.bucket.name | Set this property to choose a bucket to 
hold temporary blobs for in-progress writes via `RecoverableWriter`. If this 
property is not set, temporary blobs will be written to same bucket as the 
final file being written. In either case, temporary blobs are written with the 
prefix `.inprogress/`. <br><br>  It is recommended to choose a separate bucket 
in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), 
to provide a mechanism to  [...]
+| gs.writer.chunk.size            | Set this property to [set the chunk 
size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_)
 for writes via `RecoverableWriter`. <br><br>If not set, a Google-determined 
default chunk size will be used.                                                
                                                                                
                              [...]
+| gs.filesink.entropy.enabled     | Set this property to improve performance 
due to hotspotting issues on GCS. This option defines whether to enable entropy 
injection in filesink gcs path. If this is enabled, entropy in the form of 
temporary object id will be injected in beginning of the gcs path of the 
temporary objects. The final object path remains unchanged.                     
                                                                                
                            [...]
 
 ### Authentication to access GCS
 
diff --git a/docs/content/docs/deployment/filesystems/gcs.md 
b/docs/content/docs/deployment/filesystems/gcs.md
index 3bd130e2046..bd097b3a4c4 100644
--- a/docs/content/docs/deployment/filesystems/gcs.md
+++ b/docs/content/docs/deployment/filesystems/gcs.md
@@ -76,10 +76,11 @@ You can also set `gcs-connector` options directly in the 
Hadoop `core-site.xml`
 
 `flink-gs-fs-hadoop` can also be configured by setting the following options 
in `flink-conf.yaml`:
 
-| Key                                       | Description                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|-------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| gs.writer.temporary.bucket.name           | Set this property to choose a 
bucket to hold temporary blobs for in-progress writes via `RecoverableWriter`. 
If this property is not set, temporary blobs will be written to same bucket as 
the final file being written. In either case, temporary blobs are written with 
the prefix `.inprogress/`. <br><br>  It is recommended to choose a separate 
bucket in order to [assign it a 
TTL](https://cloud.google.com/storage/docs/lifecycle), to provide a mec [...]
-| gs.writer.chunk.size                      | Set this property to [set the 
chunk 
size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_)
 for writes via `RecoverableWriter`. <br><br>If not set, a Google-determined 
default chunk size will be used.                                                
                                                                                
                    [...]
+| Key                             | Description                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|---------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| gs.writer.temporary.bucket.name | Set this property to choose a bucket to 
hold temporary blobs for in-progress writes via `RecoverableWriter`. If this 
property is not set, temporary blobs will be written to same bucket as the 
final file being written. In either case, temporary blobs are written with the 
prefix `.inprogress/`. <br><br>  It is recommended to choose a separate bucket 
in order to [assign it a TTL](https://cloud.google.com/storage/docs/lifecycle), 
to provide a mechanism to  [...]
+| gs.writer.chunk.size            | Set this property to [set the chunk 
size](https://cloud.google.com/java/docs/reference/google-cloud-core/latest/com.google.cloud.WriteChannel#com_google_cloud_WriteChannel_setChunkSize_int_)
 for writes via `RecoverableWriter`. <br><br>If not set, a Google-determined 
default chunk size will be used.                                                
                                                                                
                              [...]
+| gs.filesink.entropy.enabled     | Set this property to improve performance 
due to hotspotting issues on GCS. This option defines whether to enable entropy 
injection in filesink gcs path. If this is enabled, entropy in the form of 
temporary object id will be injected in beginning of the gcs path of the 
temporary objects. The final object path remains unchanged.                     
                                                                                
                            [...]
 
 ### Authentication to access GCS
 
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java
index d2b53414a4d..71c0d7593e9 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java
@@ -47,6 +47,18 @@ public class GSFileSystemOptions {
                             "This option sets the chunk size for writes to the 
underlying Google storage. If set, this must be a multiple "
                                     + "of 256KB. If not set, writes will use 
Google's default chunk size.");
 
+    /* Flink config option to determine if entropy should be enabled in 
filesink gcs path. */
+    public static final ConfigOption<Boolean> ENABLE_FILESINK_ENTROPY =
+            ConfigOptions.key("gs.filesink.entropy.enabled")
+                    .booleanType()
+                    .defaultValue(Boolean.FALSE)
+                    .withDescription(
+                            "This option can be used to improve performance 
due to hotspotting "
+                                    + "issues on GCS. If this is enabled, 
entropy in the form of "
+                                    + "temporary object id will be injected in 
the beginning of "
+                                    + "gcs path of the temporary objects. The 
final object path "
+                                    + "remains unchanged.");
+
     /** The Flink configuration. */
     private final Configuration flinkConfig;
 
@@ -80,6 +92,11 @@ public class GSFileSystemOptions {
         return flinkConfig.getOptional(WRITER_CHUNK_SIZE);
     }
 
+    /** Whether entropy insertion is enabled in filesink path. */
+    public Boolean isFileSinkEntropyEnabled() {
+        return flinkConfig.get(ENABLE_FILESINK_ENTROPY);
+    }
+
     @Override
     public String toString() {
         return "GSFileSystemOptions{"
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
index 8442bc7c68d..48af31bedec 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
@@ -107,6 +107,22 @@ public class BlobUtils {
         return getTemporaryObjectPartialName(finalBlobIdentifier) + 
temporaryObjectId.toString();
     }
 
+    /**
+     * Returns a temporary object name with entropy, formed by adding the 
temporary object id to the
+     * temporary object partial name in both start and end of path, i.e. 
abc.inprogress/foo/bar/abc
+     * for the final blob with object name "foo/bar" and temporary object id 
"abc".
+     *
+     * @param finalBlobIdentifier The final blob identifier
+     * @param temporaryObjectId The temporary object id
+     * @return The temporary object name with entropy
+     */
+    public static String getTemporaryObjectNameWithEntropy(
+            GSBlobIdentifier finalBlobIdentifier, UUID temporaryObjectId) {
+        return temporaryObjectId.toString()
+                + getTemporaryObjectPartialName(finalBlobIdentifier)
+                + temporaryObjectId.toString();
+    }
+
     /**
      * Resolves a temporary blob identifier for a provided temporary object id 
and the provided
      * options.
@@ -122,7 +138,10 @@ public class BlobUtils {
             GSFileSystemOptions options) {
         String temporaryBucketName = 
BlobUtils.getTemporaryBucketName(finalBlobIdentifier, options);
         String temporaryObjectName =
-                BlobUtils.getTemporaryObjectName(finalBlobIdentifier, 
temporaryObjectId);
+                options.isFileSinkEntropyEnabled()
+                        ? BlobUtils.getTemporaryObjectNameWithEntropy(
+                                finalBlobIdentifier, temporaryObjectId)
+                        : 
BlobUtils.getTemporaryObjectName(finalBlobIdentifier, temporaryObjectId);
         return new GSBlobIdentifier(temporaryBucketName, temporaryObjectName);
     }
 }
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSCommitRecoverable.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSCommitRecoverable.java
index edd9566d3b9..e9f02105e0f 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSCommitRecoverable.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSCommitRecoverable.java
@@ -73,8 +73,11 @@ class GSCommitRecoverable implements 
RecoverableWriter.CommitRecoverable {
                 componentObjectIds.stream()
                         .map(
                                 temporaryObjectId ->
-                                        BlobUtils.getTemporaryObjectName(
-                                                finalBlobIdentifier, 
temporaryObjectId))
+                                        options.isFileSinkEntropyEnabled()
+                                                ? 
BlobUtils.getTemporaryObjectNameWithEntropy(
+                                                        finalBlobIdentifier, 
temporaryObjectId)
+                                                : 
BlobUtils.getTemporaryObjectName(
+                                                        finalBlobIdentifier, 
temporaryObjectId))
                         .map(
                                 temporaryObjectName ->
                                         new GSBlobIdentifier(
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
index e67ff38cb61..9258aeaf9d1 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
@@ -53,6 +54,8 @@ class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Comm
     /** The max number of blobs to compose in a single operation. */
     private final int composeMaxBlobs;
 
+    private List<GSBlobIdentifier> composedTempBlobIdentifiers = new 
ArrayList<>();
+
     GSRecoverableWriterCommitter(
             GSBlobStorage storage,
             GSFileSystemOptions options,
@@ -197,6 +200,7 @@ class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Comm
                         BlobUtils.getTemporaryBlobIdentifier(
                                 recoverable.finalBlobIdentifier, 
temporaryObjectId, options);
                 composeBlobs(recoverable.getComponentBlobIds(options), 
intermediateBlobIdentifier);
+                composedTempBlobIdentifiers.add(intermediateBlobIdentifier);
                 storage.copy(intermediateBlobIdentifier, 
recoverable.finalBlobIdentifier);
             }
         }
@@ -212,16 +216,27 @@ class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Comm
                 options,
                 recoverable);
 
-        // determine the partial name for the temporary objects to be deleted
+        List<GSBlobIdentifier> foundTempBlobIdentifiers = new ArrayList<>();
         String temporaryBucketName =
                 
BlobUtils.getTemporaryBucketName(recoverable.finalBlobIdentifier, options);
-        String temporaryObjectPartialName =
-                
BlobUtils.getTemporaryObjectPartialName(recoverable.finalBlobIdentifier);
 
-        // find all the temp blobs by looking for anything that starts with 
the temporary
-        // object partial name. doing it this way finds any orphaned temp 
blobs as well
-        List<GSBlobIdentifier> foundTempBlobIdentifiers =
-                storage.list(temporaryBucketName, temporaryObjectPartialName);
+        if (options.isFileSinkEntropyEnabled()) {
+            // if filesink entropy is enabled, we cannot find temp blobs with 
a fixed prefix.
+            // We should have a predefined list of temp blobs to delete.
+            if 
(!recoverable.finalBlobIdentifier.bucketName.equals(temporaryBucketName)) {
+                foundTempBlobIdentifiers.addAll(composedTempBlobIdentifiers);
+            }
+            
foundTempBlobIdentifiers.addAll(recoverable.getComponentBlobIds(options));
+        } else {
+            // determine the partial name for the temporary objects to be 
deleted
+            String temporaryObjectPartialName =
+                    
BlobUtils.getTemporaryObjectPartialName(recoverable.finalBlobIdentifier);
+
+            // find all the temp blobs by looking for anything that starts 
with the temporary
+            // object partial name. doing it this way finds any orphaned temp 
blobs as well
+            foundTempBlobIdentifiers =
+                    storage.list(temporaryBucketName, 
temporaryObjectPartialName);
+        }
         if (!foundTempBlobIdentifiers.isEmpty()) {
 
             // delete all the temp blobs, and populate the set with ones that 
were actually deleted
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java
index 2150b0d9fdf..e53b64e8fd7 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/utils/BlobUtilsTest.java
@@ -92,6 +92,21 @@ public class BlobUtilsTest {
         
assertEquals(".inprogress/foo/bar/f09c43e5-ea49-4537-a406-0586f8f09d47", 
partialName);
     }
 
+    @Test
+    public void shouldProperlyConstructTemporaryObjectNameWithEntropy() {
+        Configuration flinkConfig = new Configuration();
+        flinkConfig.set(GSFileSystemOptions.ENABLE_FILESINK_ENTROPY, 
Boolean.TRUE);
+
+        GSBlobIdentifier identifier = new GSBlobIdentifier("foo", "bar");
+        UUID temporaryObjectId = 
UUID.fromString("f09c43e5-ea49-4537-a406-0586f8f09d47");
+
+        String partialName =
+                BlobUtils.getTemporaryObjectNameWithEntropy(identifier, 
temporaryObjectId);
+        assertEquals(
+                
"f09c43e5-ea49-4537-a406-0586f8f09d47.inprogress/foo/bar/f09c43e5-ea49-4537-a406-0586f8f09d47",
+                partialName);
+    }
+
     @Test
     public void 
shouldProperlyConstructTemporaryBlobIdentifierWithDefaultBucket() {
         Configuration flinkConfig = new Configuration();
diff --git 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java
 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java
index dd62a503946..eb1f82eeac7 100644
--- 
a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java
+++ 
b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/writer/GSCommitRecoverableTest.java
@@ -133,4 +133,42 @@ public class GSCommitRecoverableTest {
             assertEquals(expectedObjectName, 
componentBlobIdentifier.objectName);
         }
     }
+
+    @Test
+    public void shouldGetComponentBlobIdsWithEntropy() {
+
+        // configure options, if this test configuration has a temporary 
bucket name, set it
+        Configuration flinkConfig = new Configuration();
+        if (temporaryBucketName != null) {
+            flinkConfig.set(GSFileSystemOptions.WRITER_TEMPORARY_BUCKET_NAME, 
temporaryBucketName);
+        }
+        // enable filesink entropy
+        flinkConfig.set(GSFileSystemOptions.ENABLE_FILESINK_ENTROPY, 
Boolean.TRUE);
+        GSFileSystemOptions options = new GSFileSystemOptions(flinkConfig);
+        GSCommitRecoverable commitRecoverable =
+                new GSCommitRecoverable(blobIdentifier, componentObjectIds);
+        List<GSBlobIdentifier> componentBlobIdentifiers =
+                commitRecoverable.getComponentBlobIds(options);
+
+        for (int i = 0; i < componentObjectIds.size(); i++) {
+            UUID componentObjectId = componentObjectIds.get(i);
+            GSBlobIdentifier componentBlobIdentifier = 
componentBlobIdentifiers.get(i);
+
+            // if a temporary bucket is specified in options, the component 
blob identifier
+            // should be in this bucket; otherwise, it should be in the bucket 
with the final blob
+            assertEquals(
+                    temporaryBucketName == null ? blobIdentifier.bucketName : 
temporaryBucketName,
+                    componentBlobIdentifier.bucketName);
+
+            // make sure the name is what is expected
+            String expectedObjectName =
+                    String.format(
+                            "%s.inprogress/%s/%s/%s",
+                            componentObjectId,
+                            blobIdentifier.bucketName,
+                            blobIdentifier.objectName,
+                            componentObjectId);
+            assertEquals(expectedObjectName, 
componentBlobIdentifier.objectName);
+        }
+    }
 }

Reply via email to