galenwarren commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r650441644



##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final GSBlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable for the commit operation. */
+    private final GSResumeRecoverable recoverable;
+
+    GSRecoverableWriterCommitter(
+            GSBlobStorage storage, GSFileSystemOptions options, 
GSResumeRecoverable recoverable) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.recoverable = Preconditions.checkNotNull(recoverable);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // first, make sure the final blob doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (blobMetadata.isPresent()) {
+            throw new IOException(
+                    String.format(
+                            "Blob %s already exists during attempted commit",
+                            recoverable.finalBlobIdentifier));
+        }
+
+        // write the final blob
+        writeFinalBlob();
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public void commitAfterRecovery() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // only write the final blob if it doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (!blobMetadata.isPresent()) {
+            writeFinalBlob();
+        }
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public RecoverableWriter.CommitRecoverable getRecoverable() {
+        return recoverable;
+    }
+
+    /**
+     * Helper to compose an arbitrary number of blobs into a final blob, 
staying under the
+     * COMPOSE_MAX_BLOBS limit for any individual compose operation.
+     *
+     * @param sourceBlobIdentifiers The source blob ids to compose
+     * @param targetBlobIdentifier The target blob id for the composed result
+     */
+    private void composeBlobs(
+            List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier 
targetBlobIdentifier) {
+        Preconditions.checkNotNull(sourceBlobIdentifiers);
+        Preconditions.checkArgument(sourceBlobIdentifiers.size() > 0);
+        Preconditions.checkNotNull(targetBlobIdentifier);
+
+        // split the source list into two parts; first, the ones we can 
compose in this operation
+        // (up to COMPOSE_MAX_BLOBS), and, second, whichever blobs are left 
over
+        final int composeToIndex =
+                Math.min(BlobUtils.COMPOSE_MAX_BLOBS, 
sourceBlobIdentifiers.size());
+        List<GSBlobIdentifier> composeBlobIds = 
sourceBlobIdentifiers.subList(0, composeToIndex);
+        List<GSBlobIdentifier> remainingBlobIds =
+                sourceBlobIdentifiers.subList(composeToIndex, 
sourceBlobIdentifiers.size());
+
+        // determine the resulting blob id for this compose operation. if this 
is the last compose,
+        // i.e. if there are no remaining blob ids, then the composed blob id 
is the originally
+        // specified target blob id. otherwise, we must create an intermediate 
blob id to hold the
+        // result of this compose operation
+        GSBlobIdentifier composedBlobId =
+                remainingBlobIds.isEmpty()
+                        ? targetBlobIdentifier
+                        : BlobUtils.generateTemporaryBlobIdentifier(
+                                recoverable.finalBlobIdentifier, options);
+
+        // compose the blobs
+        storage.compose(composeBlobIds, composedBlobId);
+
+        // if we have remaining blobs, add the composed blob id to the 
beginning of the list
+        // of remaining blob ids, and recurse
+        if (!remainingBlobIds.isEmpty()) {
+            remainingBlobIds.add(0, composedBlobId);
+            composeBlobs(remainingBlobIds, targetBlobIdentifier);
+        }
+    }

Review comment:
       @xintongsong I do see what you're saying about ```COMPOSE_MAX_BLOBS``` 
being a limitation of GCS blob storage and not necessarily of the abstract 
```GSBlobStorage``` interface, and so this logic could, in principle, be moved 
to be internal to ```GSBlobStorageImpl```.
   
   However, from a unit-testing perspective, I'd been hoping to keep the 
implementation of ```GSBlobStorageImpl``` as simple as possible, i.e. making 
each method's implementation as close to simple pass-through calls to the 
underlying Google storage interfaces as possible. In particular, I'm hoping to 
avoid putting any complicated logic in need of testing there. This is because 
the underlying Google storage interfaces are tricky to mock -- i.e. they have 
lots of methods and they sometimes return types that are themselves difficult 
to create or mock -- and so writing tests at that level will, in general, be 
more difficult.
   
   Alternately, if I were to keep the ```COMPOSE_MAX_BLOBS```-aware compose 
logic outside of ```GSBlobStorageImpl```, then I could test that logic using a 
mocked implementation of ```GSBlobStorage``` -- i.e. ```MockBlobStorage``` -- 
which is easy to test against.
   
   What would you think of an alternate approach, where I would move the 
```COMPOSE_MAX_BLOBS```-aware compose logic out of 
```GSRecoverableWriterCommitter#composeBlobs``` and into a utility method, say 
```BlobUtils#composeBlobs```? That utility method would take the same 
parameters as the existing one plus: 
   * A ```GSBlobStorage``` instance to operate against
   * A parameter indicating the maximum number of blobs to compose at once
   
   So, essentially, this new utility method would be a generic implementation 
of the "compose blobs with a limit on the number of blobs that can be composed 
in one step" algorithm, which I could test easily against MockBlobStorage. 
Then, ```GSRecoverableWriterCommitter``` could call that utility method, 
passing COMPOSE_MAX_BLOBS for the max number of blobs to compose at once.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final GSBlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable for the commit operation. */
+    private final GSResumeRecoverable recoverable;
+
+    GSRecoverableWriterCommitter(
+            GSBlobStorage storage, GSFileSystemOptions options, 
GSResumeRecoverable recoverable) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.recoverable = Preconditions.checkNotNull(recoverable);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // first, make sure the final blob doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (blobMetadata.isPresent()) {
+            throw new IOException(
+                    String.format(
+                            "Blob %s already exists during attempted commit",
+                            recoverable.finalBlobIdentifier));
+        }
+
+        // write the final blob
+        writeFinalBlob();
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public void commitAfterRecovery() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // only write the final blob if it doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (!blobMetadata.isPresent()) {
+            writeFinalBlob();
+        }
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public RecoverableWriter.CommitRecoverable getRecoverable() {
+        return recoverable;
+    }
+
+    /**
+     * Helper to compose an arbitrary number of blobs into a final blob, 
staying under the
+     * COMPOSE_MAX_BLOBS limit for any individual compose operation.
+     *
+     * @param sourceBlobIdentifiers The source blob ids to compose
+     * @param targetBlobIdentifier The target blob id for the composed result
+     */
+    private void composeBlobs(
+            List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier 
targetBlobIdentifier) {
+        Preconditions.checkNotNull(sourceBlobIdentifiers);
+        Preconditions.checkArgument(sourceBlobIdentifiers.size() > 0);
+        Preconditions.checkNotNull(targetBlobIdentifier);
+
+        // split the source list into two parts; first, the ones we can 
compose in this operation
+        // (up to COMPOSE_MAX_BLOBS), and, second, whichever blobs are left 
over
+        final int composeToIndex =
+                Math.min(BlobUtils.COMPOSE_MAX_BLOBS, 
sourceBlobIdentifiers.size());
+        List<GSBlobIdentifier> composeBlobIds = 
sourceBlobIdentifiers.subList(0, composeToIndex);
+        List<GSBlobIdentifier> remainingBlobIds =
+                sourceBlobIdentifiers.subList(composeToIndex, 
sourceBlobIdentifiers.size());
+
+        // determine the resulting blob id for this compose operation. if this 
is the last compose,
+        // i.e. if there are no remaining blob ids, then the composed blob id 
is the originally
+        // specified target blob id. otherwise, we must create an intermediate 
blob id to hold the
+        // result of this compose operation
+        GSBlobIdentifier composedBlobId =
+                remainingBlobIds.isEmpty()
+                        ? targetBlobIdentifier
+                        : BlobUtils.generateTemporaryBlobIdentifier(
+                                recoverable.finalBlobIdentifier, options);
+
+        // compose the blobs
+        storage.compose(composeBlobIds, composedBlobId);
+
+        // if we have remaining blobs, add the composed blob id to the 
beginning of the list
+        // of remaining blob ids, and recurse
+        if (!remainingBlobIds.isEmpty()) {
+            remainingBlobIds.add(0, composedBlobId);
+            composeBlobs(remainingBlobIds, targetBlobIdentifier);
+        }
+    }

Review comment:
       @xintongsong I do see what you're saying about ```COMPOSE_MAX_BLOBS``` 
being a limitation of GCS blob storage and not necessarily of the abstract 
```GSBlobStorage``` interface, and so this logic could, in principle, be moved 
to be internal to ```GSBlobStorageImpl```.
   
   However, from a unit-testing perspective, I'd been hoping to keep the 
implementation of ```GSBlobStorageImpl``` as simple as possible, i.e. making 
each method's implementation as close to simple pass-through calls to the 
underlying Google storage interfaces as possible. In particular, I'm hoping to 
avoid putting any complicated logic in need of testing there. This is because 
the underlying Google storage interfaces are tricky to mock -- i.e. they have 
lots of methods and they sometimes return types that are themselves difficult 
to create or mock -- and so writing tests at that level will, in general, be 
more difficult.
   
   Alternately, if I were to keep the ```COMPOSE_MAX_BLOBS```-aware compose 
logic outside of ```GSBlobStorageImpl```, then I could test that logic using a 
mocked implementation of ```GSBlobStorage``` -- i.e. ```MockBlobStorage``` -- 
which is easy to test against.
   
   What would you think of an alternate approach, where I would move the 
```COMPOSE_MAX_BLOBS```-aware compose logic out of 
```GSRecoverableWriterCommitter#composeBlobs``` and into a utility method, say 
```BlobUtils#composeBlobs```? That utility method would take the same 
parameters as the existing one plus: 
   * A ```GSBlobStorage``` instance to operate against
   * A parameter indicating the maximum number of blobs to compose at once
   
   So, essentially, this new utility method would be a generic implementation 
of the "compose blobs with a limit on the number of blobs that can be composed 
in one step" algorithm, which I could test easily against MockBlobStorage. 
Then, ```GSRecoverableWriterCommitter``` could call that utility method, 
passing ```COMPOSE_MAX_BLOBS``` for the max number of blobs to compose at once.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final GSBlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable for the commit operation. */
+    private final GSResumeRecoverable recoverable;
+
+    GSRecoverableWriterCommitter(
+            GSBlobStorage storage, GSFileSystemOptions options, 
GSResumeRecoverable recoverable) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.recoverable = Preconditions.checkNotNull(recoverable);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // first, make sure the final blob doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (blobMetadata.isPresent()) {
+            throw new IOException(
+                    String.format(
+                            "Blob %s already exists during attempted commit",
+                            recoverable.finalBlobIdentifier));
+        }
+
+        // write the final blob
+        writeFinalBlob();
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public void commitAfterRecovery() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // only write the final blob if it doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (!blobMetadata.isPresent()) {
+            writeFinalBlob();
+        }
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public RecoverableWriter.CommitRecoverable getRecoverable() {
+        return recoverable;
+    }
+
+    /**
+     * Helper to compose an arbitrary number of blobs into a final blob, 
staying under the
+     * COMPOSE_MAX_BLOBS limit for any individual compose operation.
+     *
+     * @param sourceBlobIdentifiers The source blob ids to compose
+     * @param targetBlobIdentifier The target blob id for the composed result
+     */
+    private void composeBlobs(
+            List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier 
targetBlobIdentifier) {
+        Preconditions.checkNotNull(sourceBlobIdentifiers);
+        Preconditions.checkArgument(sourceBlobIdentifiers.size() > 0);
+        Preconditions.checkNotNull(targetBlobIdentifier);
+
+        // split the source list into two parts; first, the ones we can 
compose in this operation
+        // (up to COMPOSE_MAX_BLOBS), and, second, whichever blobs are left 
over
+        final int composeToIndex =
+                Math.min(BlobUtils.COMPOSE_MAX_BLOBS, 
sourceBlobIdentifiers.size());
+        List<GSBlobIdentifier> composeBlobIds = 
sourceBlobIdentifiers.subList(0, composeToIndex);
+        List<GSBlobIdentifier> remainingBlobIds =
+                sourceBlobIdentifiers.subList(composeToIndex, 
sourceBlobIdentifiers.size());
+
+        // determine the resulting blob id for this compose operation. if this 
is the last compose,
+        // i.e. if there are no remaining blob ids, then the composed blob id 
is the originally
+        // specified target blob id. otherwise, we must create an intermediate 
blob id to hold the
+        // result of this compose operation
+        GSBlobIdentifier composedBlobId =
+                remainingBlobIds.isEmpty()
+                        ? targetBlobIdentifier
+                        : BlobUtils.generateTemporaryBlobIdentifier(
+                                recoverable.finalBlobIdentifier, options);
+
+        // compose the blobs
+        storage.compose(composeBlobIds, composedBlobId);
+
+        // if we have remaining blobs, add the composed blob id to the 
beginning of the list
+        // of remaining blob ids, and recurse
+        if (!remainingBlobIds.isEmpty()) {
+            remainingBlobIds.add(0, composedBlobId);
+            composeBlobs(remainingBlobIds, targetBlobIdentifier);
+        }
+    }

Review comment:
       @xintongsong I do see what you're saying about ```COMPOSE_MAX_BLOBS``` 
being a limitation of GCS blob storage and not necessarily of the abstract 
```GSBlobStorage``` interface, and so this logic could, in principle, be moved 
to be internal to ```GSBlobStorageImpl```.
   
   However, from a unit-testing perspective, I'd been hoping to keep the 
implementation of ```GSBlobStorageImpl``` as simple as possible, i.e. making 
each method's implementation as close to simple pass-through calls to the 
underlying Google storage interfaces as possible. In particular, I'm hoping to 
avoid putting any complicated logic in need of testing there. This is because 
the underlying Google storage interfaces are tricky to mock -- i.e. they have 
lots of methods and they sometimes return types that are themselves difficult 
to create or mock -- and so writing tests at that level will, in general, be 
more difficult.
   
   Alternately, if I were to keep the ```COMPOSE_MAX_BLOBS```-aware compose 
logic outside of ```GSBlobStorageImpl```, then I could test that logic using a 
mocked implementation of ```GSBlobStorage``` -- i.e. ```MockBlobStorage``` -- 
which is easy to test against.
   
   What would you think of an alternate approach, where I would move the 
```COMPOSE_MAX_BLOBS```-aware compose logic out of 
```GSRecoverableWriterCommitter#composeBlobs``` and into a utility method, say 
```BlobUtils#composeBlobs```? That utility method would take the same 
parameters as the existing one plus: 
   * A ```GSBlobStorage``` instance to operate against
   * A parameter indicating the maximum number of blobs to compose at once
   
   So, essentially, this new utility method would be a generic implementation 
of the "compose blobs with a limit on the number of blobs that can be composed 
in one step" algorithm, which I could test easily against 
```MockBlobStorage```. Then, ```GSRecoverableWriterCommitter``` could call that 
utility method, passing ```COMPOSE_MAX_BLOBS``` for the max number of blobs to 
compose at once.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    /** The scheme for the Google Storage file system. */
+    public static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    private final HadoopConfigLoader hadoopConfigLoader;
+
+    private Configuration flinkConfig;
+
+    /** Constructs the Google Storage file system factory. */
+    public GSFileSystemFactory() {
+        this.hadoopConfigLoader =
+                new HadoopConfigLoader(
+                        FLINK_CONFIG_PREFIXES,
+                        MIRRORED_CONFIG_KEYS,
+                        HADOOP_CONFIG_PREFIX,
+                        Collections.emptySet(),
+                        Collections.emptySet(),
+                        FLINK_SHADING_PREFIX);
+    }
+
+    @Override
+    public void configure(Configuration flinkConfig) {
+        Preconditions.checkNotNull(flinkConfig);
+
+        this.flinkConfig = flinkConfig;

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    /** The scheme for the Google Storage file system. */
+    public static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    private final HadoopConfigLoader hadoopConfigLoader;
+
+    private Configuration flinkConfig;

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.storage.GSBlobStorageImpl;
+import org.apache.flink.fs.gs.writer.GSRecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+import java.io.IOException;
+
+/** Provides recoverable-writer functionality for the standard 
GoogleHadoopFileSystem. */
+class GSFileSystem extends HadoopFileSystem {
+
+    private final GSFileSystemOptions options;
+
+    GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
+        super(Preconditions.checkNotNull(googleHadoopFileSystem));
+        this.options = Preconditions.checkNotNull(options);
+    }
+
+    @Override
+    public RecoverableWriter createRecoverableWriter() throws IOException {

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorage.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** Abstract blob storage, used to simplify interface to Google storage and 
make it mockable. */
+public interface GSBlobStorage {
+
+    /**
+     * Creates a write channel.
+     *
+     * @param blobIdentifier The blob identifier to which to write
+     * @return The WriteChannel helper
+     */
+    WriteChannel writeBlob(GSBlobIdentifier blobIdentifier);
+
+    /**
+     * Gets blob metadata.
+     *
+     * @param blobIdentifier The blob identifier
+     * @return The blob metadata, if the blob exists. Empty if the blob 
doesn't exist.
+     */
+    Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier);
+
+    /**
+     * Lists all the blobs in a bucket matching a given prefix.
+     *
+     * @param bucketName The bucket name
+     * @param prefix The object prefix
+     * @return The found blobs ids
+     */
+    List<GSBlobIdentifier> list(String bucketName, String prefix);
+
+    /**
+     * Copies from a source blob id to a target blob id. Does not delete the 
source blob.
+     *
+     * @param sourceBlobIdentifier The source blob identifier
+     * @param targetBlobIdentifier The target glob identifier
+     */
+    void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier 
targetBlobIdentifier);
+
+    /**
+     * Composes multiple blobs into one. Does not delete any of the source 
blobs.
+     *
+     * @param sourceBlobIdentifiers The source blob identifiers to combine, 
max of 32
+     * @param targetBlobIdentifier The target blob identifier
+     */
+    void compose(
+            List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier 
targetBlobIdentifier);
+
+    /**
+     * Deletes blobs. Note that this does not fail if blobs don't exist.
+     *
+     * @param blobIdentifiers The blob identifiers to delete
+     * @return The results of each delete operation.
+     */
+    List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers);
+
+    /** Abstract blob metadata. */
+    interface BlobMetadata {
+
+        /**
+         * The crc32 checksum for the blob.
+         *
+         * @return The checksum
+         */
+        String getChecksum();
+    }
+
+    /** Abstract blob write channel. */
+    interface WriteChannel {
+
+        /**
+         * Sets the chunk size for upload.
+         *
+         * @param chunkSize The chunk size
+         */
+        void setChunkSize(int chunkSize);

Review comment:
       That's a good question. I don't see anything either that would prohibit 
changing the chunk size after some data has already been written. However, we 
don't really *need* the ability to change it midstream like that, either. So 
I've changed it to set the chunk size in ```GSBlobStorageImpl#writeBlob``` and 
removed ```setChunkSize``` from the ```GSBlobStorage#WriteChannel``` interface, 
as you suggested.
   
   Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/** The recoverable writer implementation for Google storage. */
+public class GSRecoverableWriter implements RecoverableWriter {
+
+    /** The underlying blob storage. */
+    private final GSBlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /**
+     * Construct a GS recoverable writer.
+     *
+     * @param storage The underlying blob storage instance
+     * @param options The GS file system options
+     */
+    public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions 
options) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        // we can't clean up any state prior to commit
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        return false;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream open(Path path) throws IOException {
+        Preconditions.checkNotNull(path);
+
+        GSBlobIdentifier finalBlobIdentifier = 
BlobUtils.parseUri(path.toUri());
+        return new GSRecoverableFsDataOutputStream(storage, options, 
finalBlobIdentifier);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) 
throws IOException {

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/** The recoverable writer implementation for Google storage. */
+public class GSRecoverableWriter implements RecoverableWriter {
+
+    /** The underlying blob storage. */
+    private final GSBlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /**
+     * Construct a GS recoverable writer.
+     *
+     * @param storage The underlying blob storage instance
+     * @param options The GS file system options
+     */
+    public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions 
options) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        // we can't clean up any state prior to commit
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        return false;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream open(Path path) throws IOException {
+        Preconditions.checkNotNull(path);
+
+        GSBlobIdentifier finalBlobIdentifier = 
BlobUtils.parseUri(path.toUri());
+        return new GSRecoverableFsDataOutputStream(storage, options, 
finalBlobIdentifier);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) 
throws IOException {
+        Preconditions.checkNotNull(resumable);
+
+        GSResumeRecoverable recoverable = (GSResumeRecoverable) resumable;
+        return new GSRecoverableFsDataOutputStream(storage, options, 
recoverable);
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws 
IOException {

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/** The recoverable writer implementation for Google storage. */
+public class GSRecoverableWriter implements RecoverableWriter {
+
+    /** The underlying blob storage. */
+    private final GSBlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /**
+     * Construct a GS recoverable writer.
+     *
+     * @param storage The underlying blob storage instance
+     * @param options The GS file system options
+     */
+    public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions 
options) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        // we can't clean up any state prior to commit
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        return false;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream open(Path path) throws IOException {
+        Preconditions.checkNotNull(path);
+
+        GSBlobIdentifier finalBlobIdentifier = 
BlobUtils.parseUri(path.toUri());
+        return new GSRecoverableFsDataOutputStream(storage, options, 
finalBlobIdentifier);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) 
throws IOException {
+        Preconditions.checkNotNull(resumable);
+
+        GSResumeRecoverable recoverable = (GSResumeRecoverable) resumable;
+        return new GSRecoverableFsDataOutputStream(storage, options, 
recoverable);
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws 
IOException {
+        // we can't safely clean up any state prior to commit, so do nothing 
here
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        return true;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream.Committer 
recoverForCommit(CommitRecoverable resumable)
+            throws IOException {

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSResumeRecoverable.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** A resumable state for a recoverable output stream. */
+class GSResumeRecoverable implements RecoverableWriter.ResumeRecoverable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final GSBlobIdentifier finalBlobIdentifier;
+
+    /** The write position, i.e. number of bytes that have been written so 
far. */
+    public final long position;
+
+    /** Indicates if the write has been closed. */
+    public final boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final UUID[] componentObjectIds;

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final GSBlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable for the commit operation. */
+    private final GSResumeRecoverable recoverable;
+
+    GSRecoverableWriterCommitter(
+            GSBlobStorage storage, GSFileSystemOptions options, 
GSResumeRecoverable recoverable) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.recoverable = Preconditions.checkNotNull(recoverable);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // first, make sure the final blob doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (blobMetadata.isPresent()) {
+            throw new IOException(
+                    String.format(
+                            "Blob %s already exists during attempted commit",
+                            recoverable.finalBlobIdentifier));
+        }
+
+        // write the final blob
+        writeFinalBlob();
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public void commitAfterRecovery() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // only write the final blob if it doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (!blobMetadata.isPresent()) {
+            writeFinalBlob();
+        }
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public RecoverableWriter.CommitRecoverable getRecoverable() {
+        return recoverable;
+    }
+
+    /**
+     * Helper to compose an arbitrary number of blobs into a final blob, 
staying under the
+     * COMPOSE_MAX_BLOBS limit for any individual compose operation.
+     *
+     * @param sourceBlobIdentifiers The source blob ids to compose
+     * @param targetBlobIdentifier The target blob id for the composed result
+     */
+    private void composeBlobs(
+            List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier 
targetBlobIdentifier) {
+        Preconditions.checkNotNull(sourceBlobIdentifiers);
+        Preconditions.checkArgument(sourceBlobIdentifiers.size() > 0);
+        Preconditions.checkNotNull(targetBlobIdentifier);
+
+        // split the source list into two parts; first, the ones we can 
compose in this operation
+        // (up to COMPOSE_MAX_BLOBS), and, second, whichever blobs are left 
over
+        final int composeToIndex =
+                Math.min(BlobUtils.COMPOSE_MAX_BLOBS, 
sourceBlobIdentifiers.size());
+        List<GSBlobIdentifier> composeBlobIds = 
sourceBlobIdentifiers.subList(0, composeToIndex);
+        List<GSBlobIdentifier> remainingBlobIds =
+                sourceBlobIdentifiers.subList(composeToIndex, 
sourceBlobIdentifiers.size());
+
+        // determine the resulting blob id for this compose operation. if this 
is the last compose,
+        // i.e. if there are no remaining blob ids, then the composed blob id 
is the originally
+        // specified target blob id. otherwise, we must create an intermediate 
blob id to hold the
+        // result of this compose operation
+        GSBlobIdentifier composedBlobId =
+                remainingBlobIds.isEmpty()
+                        ? targetBlobIdentifier
+                        : BlobUtils.generateTemporaryBlobIdentifier(
+                                recoverable.finalBlobIdentifier, options);
+
+        // compose the blobs
+        storage.compose(composeBlobIds, composedBlobId);
+
+        // if we have remaining blobs, add the composed blob id to the 
beginning of the list
+        // of remaining blob ids, and recurse
+        if (!remainingBlobIds.isEmpty()) {
+            remainingBlobIds.add(0, composedBlobId);
+            composeBlobs(remainingBlobIds, targetBlobIdentifier);
+        }
+    }
+
+    /**
+     * Writes the final blob by composing the temporary blobs and copying, if 
necessary.
+     *
+     * @throws IOException On underlying failure.
+     */
+    private void writeFinalBlob() throws IOException {
+
+        // compose all the component blob ids into the final blob id. if the 
component blob ids are
+        // in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+        // compose to a new temporary blob id in the same bucket as the 
component blob ids and
+        // then copy that blob to the final blob location
+        String temporaryBucketName =
+                
BlobUtils.getTemporaryBucketName(recoverable.finalBlobIdentifier, options);
+        if 
(recoverable.finalBlobIdentifier.bucketName.equals(temporaryBucketName)) {
+
+            // compose directly to final blob
+            composeBlobs(recoverable.getComponentBlobIds(options), 
recoverable.finalBlobIdentifier);
+
+        } else {
+
+            // compose to the intermediate blob, then copy
+            GSBlobIdentifier intermediateBlobIdentifier =
+                    BlobUtils.generateTemporaryBlobIdentifier(
+                            recoverable.finalBlobIdentifier, options);
+            composeBlobs(recoverable.getComponentBlobIds(options), 
intermediateBlobIdentifier);
+            storage.copy(intermediateBlobIdentifier, 
recoverable.finalBlobIdentifier);
+        }
+    }
+
+    /**
+     * Clean up after a successful commit operation, by deleting any temporary 
blobs associated with
+     * the final blob.
+     *
+     * @throws IOException On underlying storage failure
+     */
+    private void cleanupTemporaryBlobs() throws IOException {

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.GSBlobIdentifier;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final GSBlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable for the commit operation. */
+    private final GSResumeRecoverable recoverable;
+
+    GSRecoverableWriterCommitter(
+            GSBlobStorage storage, GSFileSystemOptions options, 
GSResumeRecoverable recoverable) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.recoverable = Preconditions.checkNotNull(recoverable);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // first, make sure the final blob doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (blobMetadata.isPresent()) {
+            throw new IOException(
+                    String.format(
+                            "Blob %s already exists during attempted commit",
+                            recoverable.finalBlobIdentifier));
+        }
+
+        // write the final blob
+        writeFinalBlob();
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public void commitAfterRecovery() throws IOException {
+
+        // see discussion: 
https://github.com/apache/flink/pull/15599#discussion_r623127365
+        // only write the final blob if it doesn't already exist
+        Optional<GSBlobStorage.BlobMetadata> blobMetadata =
+                storage.getMetadata(recoverable.finalBlobIdentifier);
+        if (!blobMetadata.isPresent()) {
+            writeFinalBlob();
+        }
+
+        // clean up after successful commit
+        cleanupTemporaryBlobs();
+    }
+
+    @Override
+    public RecoverableWriter.CommitRecoverable getRecoverable() {
+        return recoverable;
+    }
+
+    /**
+     * Helper to compose an arbitrary number of blobs into a final blob, 
staying under the
+     * COMPOSE_MAX_BLOBS limit for any individual compose operation.
+     *
+     * @param sourceBlobIdentifiers The source blob ids to compose
+     * @param targetBlobIdentifier The target blob id for the composed result
+     */
+    private void composeBlobs(
+            List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier 
targetBlobIdentifier) {
+        Preconditions.checkNotNull(sourceBlobIdentifiers);
+        Preconditions.checkArgument(sourceBlobIdentifiers.size() > 0);
+        Preconditions.checkNotNull(targetBlobIdentifier);
+
+        // split the source list into two parts; first, the ones we can 
compose in this operation
+        // (up to COMPOSE_MAX_BLOBS), and, second, whichever blobs are left 
over
+        final int composeToIndex =
+                Math.min(BlobUtils.COMPOSE_MAX_BLOBS, 
sourceBlobIdentifiers.size());
+        List<GSBlobIdentifier> composeBlobIds = 
sourceBlobIdentifiers.subList(0, composeToIndex);
+        List<GSBlobIdentifier> remainingBlobIds =
+                sourceBlobIdentifiers.subList(composeToIndex, 
sourceBlobIdentifiers.size());
+
+        // determine the resulting blob id for this compose operation. if this 
is the last compose,
+        // i.e. if there are no remaining blob ids, then the composed blob id 
is the originally
+        // specified target blob id. otherwise, we must create an intermediate 
blob id to hold the
+        // result of this compose operation
+        GSBlobIdentifier composedBlobId =
+                remainingBlobIds.isEmpty()
+                        ? targetBlobIdentifier
+                        : BlobUtils.generateTemporaryBlobIdentifier(
+                                recoverable.finalBlobIdentifier, options);
+
+        // compose the blobs
+        storage.compose(composeBlobIds, composedBlobId);
+
+        // if we have remaining blobs, add the composed blob id to the 
beginning of the list
+        // of remaining blob ids, and recurse
+        if (!remainingBlobIds.isEmpty()) {
+            remainingBlobIds.add(0, composedBlobId);
+            composeBlobs(remainingBlobIds, targetBlobIdentifier);
+        }
+    }
+
+    /**
+     * Writes the final blob by composing the temporary blobs and copying, if 
necessary.
+     *
+     * @throws IOException On underlying failure.
+     */
+    private void writeFinalBlob() throws IOException {

Review comment:
       Done in 
[9e8472b](https://github.com/apache/flink/pull/15599/commits/9e8472bd88cbfa2cb7b606cf9c52f089aecabe6d).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to