xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r618038288



##########
File path: flink-filesystems/flink-gs-fs-hadoop/pom.xml
##########
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-filesystems</artifactId>
+               <version>1.13-SNAPSHOT</version>

Review comment:
       The version in the master branch has come to `1.14-SNAPSHOT`. Please 
rebase the PR and update this version.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");

Review comment:
       Same here. Would it be good enough to leave the content type unset?

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")

Review comment:
       It would be nice to make the unit (in this case bytes) part of the 
config key and variable names.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** Abstract blob storage, used to simplify interface to Google storage and 
make it mockable. */
+public interface BlobStorage {
+
+    /** Abstract blob metadata. */
+    interface BlobMetadata {

Review comment:
       nit: It's not a strict rule, but in Flink we usually put the definition 
of inner classes/interfaces at the bottom.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** Abstract blob storage, used to simplify interface to Google storage and 
make it mockable. */
+public interface BlobStorage {
+
+    /** Abstract blob metadata. */
+    interface BlobMetadata {
+
+        /**
+         * The crc32 checksum for the blob.
+         *
+         * @return The checksum
+         */
+        String getChecksum();
+    }
+
+    /** Abstract blob write channel. */
+    interface WriteChannel {
+
+        /**
+         * Sets the chunk size for upload.
+         *
+         * @param chunkSize The chunk size
+         */
+        void setChunkSize(int chunkSize);
+
+        /**
+         * Writes data to the channel.
+         *
+         * @param content The data buffer
+         * @param start Start offset in the data buffer
+         * @param length Number of bytes to write
+         * @return The number of bytes written
+         * @throws IOException On underlying failure
+         */
+        int write(byte[] content, int start, int length) throws IOException;
+
+        /**
+         * Closes the channel.
+         *
+         * @throws IOException On underlying failure
+         */
+        void close() throws IOException;
+    }
+
+    /**
+     * Creates a write channel.
+     *
+     * @param blobId The blob id to write
+     * @param uploadContentType The content type for the upload
+     * @return The WriteChannel helper
+     */
+    WriteChannel write(BlobId blobId, String uploadContentType);

Review comment:
       Maybe name this `writeBlob`? It's a bit confusing that `BlobStorage` and 
`WriteChannel` both have the method `write`, which may lead to something like 
`blobStorage.write().write()`.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.storage.GSBlobStorage;
+import org.apache.flink.fs.gs.writer.GSRecoverableWriter;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+import java.io.IOException;
+
+/** Provides recoverable-writer functionality for the standard 
GoogleHadoopFileSystem. */
+class GSFileSystem extends HadoopFileSystem {

Review comment:
       To make `GSFileSystem` extend `HadoopFileSystem`, it seems we need to 
update some of the base class implementations.
   - `HadoopFileSystem#getKindForScheme`
   - `FileSystem#DIRECTLY_SUPPORTED_FILESYSTEM`
   
   It's not a good practice that the base class has to understand some 
sub-class specifications. This is mainly for historical reasons. Sorry for that.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");

Review comment:
       I wonder how many cases does a user need to specify a custom bucket and 
prefix. Maybe it's good enough to keep them non-configurable. That's how 
`HadoopRecoverableWriter` deal with the temp file paths.
   
   I'd suggest to provide as less knobs as possible to the users, unless 
there's a concrete demand.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;
+
+    GSRecoverableWriterState(

Review comment:
       This constructor can be annotated with `@VisibleForTesting`. It could be 
private if not accessed from the tests.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;

Review comment:
       I'm not sure about exposing these public, mutable fields. I think this 
is a violation of the single responsibility principle. 
`GSRecoverableWriterState` is currently serving as:
   - Maintainable internal states of `GSRecoverableFsDataOutputStream`
   - Snapshot for persistent and recovery
   
   Consequently, there are risks that
   - Potential leaking of `GSRecoverableFsDataOutputStream` internal states.
       - There's no guarantee, at least contract wise, that the recoverable 
passed in from `GSRecoverableWriter#recover` will not be used otherwhere.
       - I see `GSRecoverableFsDataOutputStream#persist` is guarded by cloning 
the state. However, this is still fragile and can be easily broken in future.
   - A snapshot is expected to be immutable once created. This is not 
guaranteed by `GSRecoverableWriterState` itself, and will require cautions from 
the callers in future changes.
   
   I'd suggest to keep the maintainable states internal to 
`GSRecoverableFsDataOutputStream`, and to have an immutable 
`ResumeRecoverable`, which is created on each `persist` call.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+/** The recoverable writer implementation for Google storage. */
+public class GSRecoverableWriter implements RecoverableWriter {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /**
+     * Construct a GS recoverable writer.
+     *
+     * @param storage The underlying blob storage instance
+     * @param options The GS file system options
+     */
+    public GSRecoverableWriter(BlobStorage storage, GSFileSystemOptions 
options) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+    }
+
+    @Override
+    public boolean requiresCleanupOfRecoverableState() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsResume() {
+        return true;
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream open(Path path) throws IOException {
+        Preconditions.checkNotNull(path);
+
+        BlobId finalBlobId = BlobUtils.parseUri(path.toUri());
+        GSRecoverableWriterState state = new 
GSRecoverableWriterState(finalBlobId);
+        return new GSRecoverableFsDataOutputStream(storage, options, this, 
state);
+    }
+
+    @Override
+    public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) 
throws IOException {
+        Preconditions.checkNotNull(resumable);
+
+        GSRecoverableWriterState state = (GSRecoverableWriterState) resumable;
+        return new GSRecoverableFsDataOutputStream(storage, options, this, 
state);
+    }
+
+    @Override
+    public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws 
IOException {
+        Preconditions.checkNotNull(resumable);
+
+        // determine the partial name for the temporary objects to be deleted
+        GSRecoverableWriterState state = (GSRecoverableWriterState) resumable;
+        String temporaryBucketName = state.getTemporaryBucketName(options);
+        String temporaryObjectPartialName = 
state.getTemporaryObjectPartialName(options);
+
+        // this will hold the set of blob ids that were actually deleted
+        HashSet<BlobId> deletedBlobIds = new HashSet<>();
+
+        // find all the temp blobs by looking for anything that starts with 
the temporary
+        // object partial name. doing it this way finds any orphaned temp 
blobs that might
+        // have come about when resuming
+        List<BlobId> foundTempBlobIds =
+                storage.list(temporaryBucketName, temporaryObjectPartialName);
+        if (!foundTempBlobIds.isEmpty()) {
+
+            // delete all the temp blobs, and populate the set with ones that 
were actually deleted
+            // normalize in case the blob came back with a generation populated
+            List<Boolean> deleteResults = storage.delete(foundTempBlobIds);
+            for (int i = 0; i < deleteResults.size(); i++) {
+                if (deleteResults.get(i)) {
+                    
deletedBlobIds.add(BlobUtils.normalizeBlobId(foundTempBlobIds.get(i)));
+                }
+            }
+        }

Review comment:
       This seems not right.
   
   For each file being written, there could be a series of snapshots (`s1`, 
`s2`, `s3`, ...) taken before it's committed. Calling 
`cleanupRecoverableState(s2)` means we no longer need to recover the file 
writing from `s2`, probably also not likely to recover from `s1` which is taken 
before `s2`. However, we can still recover from `s3`. Removing all the 
component blobs of `s2` means some of the component blobs of `s3` are also 
removed.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)

Review comment:
       Might be better to use `noDefaultValue()` here and use 
`Configuration#getOptional` for reading the config option, to enforce an 
explicit `Optional#isPresent` check wherever the config option is used.
   
   The contract that empty string means using the final bucket is a bit 
implicit and can easily get overlooked where this option is used.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")
+                    .intType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CHUNK_SIZE)

Review comment:
       Same here about using `0` for default.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")
+                    .intType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CHUNK_SIZE)
+                    .withDescription(
+                            "This option sets the chunk size for writes by the 
recoverable writer. This value is passed through to the underlying "
+                                    + "Google WriteChannel; if zero, the 
default WriteChannel value is used.");

Review comment:
       We might also want to mention in the description that:
   - the unit is bytes
   - the configured value should be multiple of 256KB (per the [GCS 
doc](https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload))

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemOptions.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.util.Preconditions;
+
+/** The GS file system options. */
+public class GSFileSystemOptions {
+
+    /**
+     * The default value for the temporary bucket, i.e. empty string which 
means use same bucket as
+     * the final blob being written
+     */
+    public static final String DEFAULT_WRITER_TEMPORARY_BUCKET_NAME = "";
+
+    /** The default value for the temporary object prefix. */
+    public static final String DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX = 
".inprogress/";
+
+    /** The default value for the content type of blobs written by the 
recoverable writer. */
+    public static final String DEFAULT_WRITER_CONTENT_TYPE = 
"application/octet-stream";
+
+    /** The default value for the writer chunk size, i.e. zero which means use 
Google default * */
+    public static final int DEFAULT_WRITER_CHUNK_SIZE = 0;
+
+    /**
+     * The temporary bucket name to use for recoverable writes. If empty, use 
the same bucket as the
+     * final blob to write.
+     */
+    public final String writerTemporaryBucketName;
+
+    /** The prefix to be applied to the final object name when generating 
temporary object names. */
+    public final String writerTemporaryObjectPrefix;
+
+    /** The content type used for files written by the recoverable writer. */
+    public final String writerContentType;
+
+    /**
+     * The chunk size to use for writes on the underlying Google WriteChannel. 
If zero, then the
+     * chunk size is not set on the underlying channel, and the default value 
is used.
+     */
+    public final int writerChunkSize;
+
+    /**
+     * Constructs an options instance.
+     *
+     * @param writerTemporaryBucketName The temporary bucket name, if empty 
use same bucket as final
+     *     blob
+     * @param writerTemporaryObjectPrefix The temporary object prefix
+     * @param writerContentType The content type
+     * @param writerChunkSize The chunk size, if zero this means use Google 
default
+     */
+    public GSFileSystemOptions(
+            String writerTemporaryBucketName,
+            String writerTemporaryObjectPrefix,
+            String writerContentType,
+            int writerChunkSize) {
+        this.writerTemporaryBucketName = 
Preconditions.checkNotNull(writerTemporaryBucketName);
+        this.writerTemporaryObjectPrefix = 
Preconditions.checkNotNull(writerTemporaryObjectPrefix);
+        this.writerContentType = Preconditions.checkNotNull(writerContentType);
+        Preconditions.checkArgument(writerChunkSize >= 0);

Review comment:
       Shall we check chunk size being multiple of 256KB?

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");
+
+    public static final ConfigOption<String> WRITER_CONTENT_TYPE =
+            ConfigOptions.key("gs.writer.content.type")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_CONTENT_TYPE)
+                    .withDescription(
+                            "This option sets the content type applied to 
files written by the recoverable writer.");
+
+    public static final ConfigOption<Integer> WRITER_CHUNK_SIZE =
+            ConfigOptions.key("gs.writer.chunk.size")
+                    .intType()

Review comment:
       Consider using `memoryType` to improve the usability. It's a bit 
inconvenient to configure a multiple of 256KB in bytes.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorage.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** BlobStorage implementation for Google storage. */
+public class GSBlobStorage implements BlobStorage {
+
+    /** Blob metadata, wraps Google storage Blob. */
+    static class BlobMetadata implements BlobStorage.BlobMetadata {
+
+        private final Blob blob;
+
+        private BlobMetadata(Blob blob) {
+            this.blob = Preconditions.checkNotNull(blob);
+        }
+
+        @Override
+        public String getChecksum() {
+            return blob.getCrc32c();
+        }
+    }
+
+    /** Blob write channel, wraps Google storage WriteChannel. */
+    static class WriteChannel implements BlobStorage.WriteChannel {
+
+        final com.google.cloud.WriteChannel writeChannel;
+
+        private WriteChannel(com.google.cloud.WriteChannel writeChannel) {
+            this.writeChannel = Preconditions.checkNotNull(writeChannel);
+        }
+
+        @Override
+        public void setChunkSize(int chunkSize) {
+            Preconditions.checkArgument(chunkSize > 0);
+
+            writeChannel.setChunkSize(chunkSize);
+        }
+
+        @Override
+        public int write(byte[] content, int start, int length) throws 
IOException {
+            Preconditions.checkNotNull(content);
+            Preconditions.checkArgument(start >= 0);
+            Preconditions.checkArgument(length >= 0);
+
+            ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
+            return writeChannel.write(byteBuffer);
+        }
+
+        @Override
+        public void close() throws IOException {
+            writeChannel.close();
+        }
+    }
+
+    /** The wrapped Google Storage instance. */
+    private final Storage storage;
+
+    /**
+     * Constructs a GSBlobStorage instance.
+     *
+     * @param storage The wrapped Google Storage instance.
+     */
+    public GSBlobStorage(Storage storage) {
+        this.storage = Preconditions.checkNotNull(storage);
+    }
+
+    @Override
+    public BlobStorage.WriteChannel write(BlobId blobId, String contentType) {
+        Preconditions.checkNotNull(blobId);
+        Preconditions.checkNotNull(contentType);
+
+        BlobInfo blobInfo = 
BlobInfo.newBuilder(blobId).setContentType(contentType).build();
+        com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+        return new WriteChannel(writeChannel);
+    }
+
+    @Override
+    public Optional<BlobStorage.BlobMetadata> getMetadata(BlobId blobId) {
+        Preconditions.checkNotNull(blobId);
+
+        Blob blob = storage.get(blobId);
+        if (blob == null) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new BlobMetadata(blob));
+        }

Review comment:
       ```suggestion
           return 
Optional.ofNullable(storage.get(blobId)).map(BlobMetadata::new);
   ```

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorage.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** BlobStorage implementation for Google storage. */
+public class GSBlobStorage implements BlobStorage {
+
+    /** Blob metadata, wraps Google storage Blob. */
+    static class BlobMetadata implements BlobStorage.BlobMetadata {
+
+        private final Blob blob;
+
+        private BlobMetadata(Blob blob) {
+            this.blob = Preconditions.checkNotNull(blob);
+        }
+
+        @Override
+        public String getChecksum() {
+            return blob.getCrc32c();
+        }
+    }
+
+    /** Blob write channel, wraps Google storage WriteChannel. */
+    static class WriteChannel implements BlobStorage.WriteChannel {
+
+        final com.google.cloud.WriteChannel writeChannel;
+
+        private WriteChannel(com.google.cloud.WriteChannel writeChannel) {
+            this.writeChannel = Preconditions.checkNotNull(writeChannel);
+        }
+
+        @Override
+        public void setChunkSize(int chunkSize) {
+            Preconditions.checkArgument(chunkSize > 0);
+
+            writeChannel.setChunkSize(chunkSize);
+        }
+
+        @Override
+        public int write(byte[] content, int start, int length) throws 
IOException {
+            Preconditions.checkNotNull(content);
+            Preconditions.checkArgument(start >= 0);
+            Preconditions.checkArgument(length >= 0);
+
+            ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length);
+            return writeChannel.write(byteBuffer);
+        }
+
+        @Override
+        public void close() throws IOException {
+            writeChannel.close();
+        }
+    }
+
+    /** The wrapped Google Storage instance. */
+    private final Storage storage;
+
+    /**
+     * Constructs a GSBlobStorage instance.
+     *
+     * @param storage The wrapped Google Storage instance.
+     */
+    public GSBlobStorage(Storage storage) {
+        this.storage = Preconditions.checkNotNull(storage);
+    }
+
+    @Override
+    public BlobStorage.WriteChannel write(BlobId blobId, String contentType) {
+        Preconditions.checkNotNull(blobId);
+        Preconditions.checkNotNull(contentType);
+
+        BlobInfo blobInfo = 
BlobInfo.newBuilder(blobId).setContentType(contentType).build();
+        com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
+        return new WriteChannel(writeChannel);
+    }
+
+    @Override
+    public Optional<BlobStorage.BlobMetadata> getMetadata(BlobId blobId) {
+        Preconditions.checkNotNull(blobId);
+
+        Blob blob = storage.get(blobId);
+        if (blob == null) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new BlobMetadata(blob));
+        }
+    }
+
+    @Override
+    public List<BlobId> list(String bucketName, String objectPrefix) {
+        Preconditions.checkNotNull(bucketName);
+        Preconditions.checkNotNull(objectPrefix);
+
+        Page<Blob> blobs = storage.list(bucketName, 
Storage.BlobListOption.prefix(objectPrefix));
+        return StreamSupport.stream(blobs.iterateAll().spliterator(), false)
+                .map(BlobInfo::getBlobId)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void copy(BlobId sourceBlobId, BlobId targetBlobId) {
+        Preconditions.checkNotNull(sourceBlobId);
+        Preconditions.checkNotNull(targetBlobId);
+
+        storage.get(sourceBlobId).copyTo(targetBlobId).getResult();
+    }
+
+    @Override
+    public void compose(List<BlobId> sourceBlobIds, BlobId targetBlobId, 
String contentType) {
+        Preconditions.checkNotNull(sourceBlobIds);
+        Preconditions.checkArgument(sourceBlobIds.size() > 0);

Review comment:
       Maybe also check `size <= 32`.
   I'm aware this is guaranteed in `GSRecoverableWriterCommitter#composeBlobs`. 
However, it would be nicer to make `GSBlobStorage` self-contained, rather than 
depending its correctness on the caller's behaviors.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import com.google.cloud.storage.BlobId;
+
+import java.net.URI;
+
+/** Utility functions related to blobs. */
+public class BlobUtils {
+
+    /** The maximum number of blobs that can be composed in a single 
operation. */
+    public static final int COMPOSE_MAX_BLOBS = 32;
+
+    /**
+     * Normalizes a blob id, ensuring that the generation is null.
+     *
+     * @param blobId The blob id
+     * @return The blob id with the generation set to null
+     */
+    public static BlobId normalizeBlobId(BlobId blobId) {
+        return BlobId.of(blobId.getBucket(), blobId.getName());
+    }

Review comment:
       Could you explain why do we need this normalization? 

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import com.google.cloud.storage.BlobId;
+
+import java.net.URI;
+
+/** Utility functions related to blobs. */
+public class BlobUtils {
+
+    /** The maximum number of blobs that can be composed in a single 
operation. */
+    public static final int COMPOSE_MAX_BLOBS = 32;
+
+    /**
+     * Normalizes a blob id, ensuring that the generation is null.
+     *
+     * @param blobId The blob id
+     * @return The blob id with the generation set to null
+     */
+    public static BlobId normalizeBlobId(BlobId blobId) {
+        return BlobId.of(blobId.getBucket(), blobId.getName());
+    }
+
+    /**
+     * Parses a blob id from a Google storage uri, i.e. gs://bucket/foo/bar 
yields a blob with
+     * bucket name "bucket" and object name "foo/bar".
+     *
+     * @param uri The gs uri
+     * @return The blob id
+     */
+    public static BlobId parseUri(URI uri) {
+        String finalBucketName = uri.getAuthority();
+        if (finalBucketName == null) {

Review comment:
       `StringUtils#isNullOrWhitespaceOnly` might be helpful here and below.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorage.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.Storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** BlobStorage implementation for Google storage. */
+public class GSBlobStorage implements BlobStorage {
+
+    /** Blob metadata, wraps Google storage Blob. */
+    static class BlobMetadata implements BlobStorage.BlobMetadata {

Review comment:
       Same here for the inner classes.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;

Review comment:
       Currently, the `BlobId` is the only google storage specific type in this 
interface. Depending on how we position this interface, I would suggest the 
following.
   - If this interface is dedicated to google storage, and the abstraction is 
purely for testability, I would suggest to rename the interface to 
`GSBlobStorage` and the implementation to `GSBlobStorageImpl`.
   - If we want to make this interface independent from google storage, we 
might also consider to wrap `BlobId` with some sort of abstraction.
   
   In general, I don't see much benefit making this interface independent from 
google storage. However, it might make sense to wrap `BlobId` anyway, given 
that there're many cases we want to ignore `BlobId#generation`. I'm not 
entirely sure about this though. WDYT?

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import com.google.cloud.storage.BlobId;
+
+import java.net.URI;
+
+/** Utility functions related to blobs. */
+public class BlobUtils {
+
+    /** The maximum number of blobs that can be composed in a single 
operation. */
+    public static final int COMPOSE_MAX_BLOBS = 32;
+
+    /**
+     * Normalizes a blob id, ensuring that the generation is null.
+     *
+     * @param blobId The blob id
+     * @return The blob id with the generation set to null
+     */
+    public static BlobId normalizeBlobId(BlobId blobId) {
+        return BlobId.of(blobId.getBucket(), blobId.getName());
+    }
+
+    /**
+     * Parses a blob id from a Google storage uri, i.e. gs://bucket/foo/bar 
yields a blob with
+     * bucket name "bucket" and object name "foo/bar".
+     *
+     * @param uri The gs uri
+     * @return The blob id
+     */
+    public static BlobId parseUri(URI uri) {
+        String finalBucketName = uri.getAuthority();

Review comment:
       We could add an assertion that the scheme is `gs`. 

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;
+
+    GSRecoverableWriterState(
+            BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> 
componentObjectIds) {
+        this.finalBlobId = Preconditions.checkNotNull(finalBlobId);
+        Preconditions.checkArgument(bytesWritten >= 0);
+        this.bytesWritten = bytesWritten;
+        this.closed = closed;
+
+        // shallow copy the component object ids to ensure this state object 
exclusively
+        // manages the list of component object ids
+        this.componentObjectIds = new 
ArrayList<>(Preconditions.checkNotNull(componentObjectIds));
+    }
+
+    GSRecoverableWriterState(GSRecoverableWriterState state) {
+        this(state.finalBlobId, state.bytesWritten, state.closed, 
state.componentObjectIds);
+    }
+
+    GSRecoverableWriterState(BlobId finalBlobId) {
+        this(finalBlobId, 0, false, new ArrayList<>());
+    }
+
+    /**
+     * Returns the temporary bucket name. If options specifies a temporary 
bucket name, we use that
+     * one; otherwise, we use the bucket name of the final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary bucket name
+     */
+    String getTemporaryBucketName(GSFileSystemOptions options) {
+        return options.writerTemporaryBucketName.isEmpty()
+                ? finalBlobId.getBucket()
+                : options.writerTemporaryBucketName;
+    }

Review comment:
       It's a bit against intuition this being a method of the state. Shouldn't 
the temp bucket name the same for all the writers/states?

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer instance. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state for the commit operation. */
+    private final GSRecoverableWriterState state;
+
+    GSRecoverableWriterCommitter(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // compose all the component blob ids into the final blob id. if the 
component blob ids are
+        // in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+        // compose to a new temporary blob id in the same bucket as the 
component blob ids and
+        // then copy that blob to the final blob location
+        if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+            // compose directly to final blob
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    state.finalBlobId,
+                    options.writerContentType);
+
+        } else {
+
+            // compose to a temporary blob id, then copy to final blob id
+            BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    intermediateBlobId,
+                    options.writerContentType);
+            storage.copy(intermediateBlobId, state.finalBlobId);
+        }
+
+        // clean up after commit
+        writer.cleanupRecoverableState(state);

Review comment:
       I'm not entirely sure about cleaning the temp blobs here.
   
   Ideally, for each `Recoverable` that has been created, there should be a 
call to `cleanupRecoverableState` that cleans up the corresponding state.
   
   I think it serves as a good safe net that we do a cleanup on committing, if 
the cleaning doesn't cause any problem. However, as you have mentioned, it is 
possible that the job restarts from an early checkpoint. I think we should not 
overwrite an existing committed file from the storage. But what if the file is 
manually cleaned up and the job is intentionally restarted from an early 
checkpoint?
   
   Maybe `GSRecoverableWriterCommitter` should not try to be over smart. We 
might tolerant some un-cleaned temp files, and it should be 
`StreamingFileSink`'s responsibility to make sure `cleanupRecoverableState` is 
always called.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;
+
+    GSRecoverableWriterState(
+            BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> 
componentObjectIds) {
+        this.finalBlobId = Preconditions.checkNotNull(finalBlobId);
+        Preconditions.checkArgument(bytesWritten >= 0);
+        this.bytesWritten = bytesWritten;
+        this.closed = closed;
+
+        // shallow copy the component object ids to ensure this state object 
exclusively
+        // manages the list of component object ids
+        this.componentObjectIds = new 
ArrayList<>(Preconditions.checkNotNull(componentObjectIds));
+    }
+
+    GSRecoverableWriterState(GSRecoverableWriterState state) {
+        this(state.finalBlobId, state.bytesWritten, state.closed, 
state.componentObjectIds);
+    }
+
+    GSRecoverableWriterState(BlobId finalBlobId) {
+        this(finalBlobId, 0, false, new ArrayList<>());
+    }
+
+    /**
+     * Returns the temporary bucket name. If options specifies a temporary 
bucket name, we use that
+     * one; otherwise, we use the bucket name of the final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary bucket name
+     */
+    String getTemporaryBucketName(GSFileSystemOptions options) {
+        return options.writerTemporaryBucketName.isEmpty()
+                ? finalBlobId.getBucket()
+                : options.writerTemporaryBucketName;
+    }
+
+    /**
+     * Returns a temporary object partial name, i.e. .inprogress/foo/bar/ for 
the final blob with
+     * object name "foo/bar". The included trailing slash is deliberate, so 
that we can be sure that
+     * object names that start with this partial name are, in fact, temporary 
files associated with
+     * the upload of the associated final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary object partial name
+     */
+    String getTemporaryObjectPartialName(GSFileSystemOptions options) {
+        String finalObjectName = finalBlobId.getName();
+        return String.format("%s%s/", options.writerTemporaryObjectPrefix, 
finalObjectName);
+    }
+
+    /**
+     * Returns a temporary object name, formed by appending the compact string 
version of the
+     * temporary object id to the temporary object partial name, i.e.
+     * .inprogress/foo/bar/EjgelvANQ525hLUW2S6DBA for the final blob with 
object name "foo/bar".
+     *
+     * @param temporaryObjectId The temporary object id
+     * @param options The GS file system options
+     * @return The temporary object name
+     */
+    String getTemporaryObjectName(UUID temporaryObjectId, GSFileSystemOptions 
options) {
+        return getTemporaryObjectPartialName(options) + 
temporaryObjectId.toString();
+    }
+
+    /**
+     * Creates a temporary blob id for a provided temporary object id.
+     *
+     * @param temporaryObjectId The temporary object id
+     * @param options The GS file system options
+     * @return
+     */
+    private BlobId createTemporaryBlobId(UUID temporaryObjectId, 
GSFileSystemOptions options) {
+        String temporaryBucketName = getTemporaryBucketName(options);
+        String temporaryObjectName = getTemporaryObjectName(temporaryObjectId, 
options);
+        return BlobId.of(temporaryBucketName, temporaryObjectName);
+    }
+
+    /**
+     * Creates a new temporary blob id.
+     *
+     * @param options The GS file system options
+     * @return The new temporary blob id.
+     */
+    BlobId createTemporaryBlobId(GSFileSystemOptions options) {
+        UUID temporaryObjectId = UUID.randomUUID();
+        return createTemporaryBlobId(temporaryObjectId, options);
+    }
+
+    /**
+     * Create a new temporary blob id and add to the list of components.
+     *
+     * @param options The GS file system options
+     * @return The new component blob id.
+     */
+    BlobId createComponentBlobId(GSFileSystemOptions options) {

Review comment:
       I'd suggest to name this something like `addNewComponentBlob`.
   - It's hard to tell the differences between `createTemporaryBlobId` and 
`createComponentBlobId` from their names. I know it's explained in the 
javadocs, but it would be nicer to indicate them directly via the method names.
   - I would consider the major task of this method is to add a new blob in the 
list, and the id is simply a handle to the new blob returned from the method.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableFsDataOutputStream.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** The data output stream implementation for the GS recoverable writer. */
+class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state. */
+    private final GSRecoverableWriterState state;
+
+    /**
+     * The current write channel, if one exists. A channel is created when one 
doesn't exist and
+     * bytes are written, and the channel is closed/destroyed when explicitly 
closed by the consumer
+     * (via close or closeForCommit) or when the data output stream is 
persisted (via persist).
+     * Calling persist does not close the data output stream, so it's possible 
that more bytes will
+     * be written, which will cause another channel to be created. So, 
multiple write channels may
+     * be created and destroyed during the lifetime of the data output stream.
+     */
+    @Nullable GSChecksumWriteChannel currentWriteChannel;
+
+    GSRecoverableFsDataOutputStream(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return state.bytesWritten;
+    }
+
+    @Override
+    public void write(int byteValue) throws IOException {
+        byte[] bytes = new byte[] {(byte) byteValue};
+        write(bytes);
+    }
+
+    @Override
+    public void write(@Nonnull byte[] content) throws IOException {
+        Preconditions.checkNotNull(content);
+
+        write(content, 0, content.length);
+    }
+
+    @Override
+    public void write(@Nonnull byte[] content, int start, int length) throws 
IOException {
+        Preconditions.checkNotNull(content);
+        Preconditions.checkArgument(start >= 0);
+        Preconditions.checkArgument(length >= 0);
+
+        // if the data stream is already closed, throw an exception
+        if (state.closed) {
+            throw new IOException("Illegal attempt to write to closed output 
stream");
+        }
+
+        // if necessary, create a write channel
+        if (currentWriteChannel == null) {
+            currentWriteChannel = createWriteChannel();
+        }
+
+        // write to the stream. the docs say that, in some circumstances, 
though an attempt will be
+        // made to write all of the requested bytes, there are some cases 
where only some bytes will
+        // be written. it's not clear whether this could ever happen with a 
Google storage
+        // WriteChannel; in any case, recoverable writers don't support 
partial writes, so if this
+        // ever happens, we must fail the write.:
+        // 
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/WritableByteChannel.html#write(java.nio.ByteBuffer)
+        int bytesWritten = currentWriteChannel.write(content, start, length);
+        if (bytesWritten != length) {
+            throw new IOException(
+                    String.format(
+                            "WriteChannel.write wrote %d of %d requested 
bytes, failing.",
+                            bytesWritten, length));
+        }
+
+        // update count of total bytes written
+        state.bytesWritten += length;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        // not supported for GS, flushing frequency is controlled by the chunk 
size setting
+        // 
https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-
+    }
+
+    @Override
+    public void sync() throws IOException {
+        // not supported for GS, flushing frequency is controlled by the chunk 
size setting
+        // 
https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-
+    }

Review comment:
       I wonder if it makes sense to throw an `UnsupportedOperationException` 
here. I'm not entirely sure, but it seems these methods are currently never 
called on `RecoverableFsDataOutputStream`.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+    /** The blob id to which the recoverable write operation is writing. */
+    public final BlobId finalBlobId;
+
+    /** The number of bytes that have been written so far. */
+    public long bytesWritten;
+
+    /** Indicates if the write has been closed. */
+    public boolean closed;
+
+    /** The object ids for the temporary objects that should be composed to 
form the final blob. */
+    public final List<UUID> componentObjectIds;
+
+    GSRecoverableWriterState(
+            BlobId finalBlobId, long bytesWritten, boolean closed, List<UUID> 
componentObjectIds) {
+        this.finalBlobId = Preconditions.checkNotNull(finalBlobId);
+        Preconditions.checkArgument(bytesWritten >= 0);
+        this.bytesWritten = bytesWritten;
+        this.closed = closed;
+
+        // shallow copy the component object ids to ensure this state object 
exclusively
+        // manages the list of component object ids
+        this.componentObjectIds = new 
ArrayList<>(Preconditions.checkNotNull(componentObjectIds));
+    }
+
+    GSRecoverableWriterState(GSRecoverableWriterState state) {
+        this(state.finalBlobId, state.bytesWritten, state.closed, 
state.componentObjectIds);
+    }
+
+    GSRecoverableWriterState(BlobId finalBlobId) {
+        this(finalBlobId, 0, false, new ArrayList<>());
+    }
+
+    /**
+     * Returns the temporary bucket name. If options specifies a temporary 
bucket name, we use that
+     * one; otherwise, we use the bucket name of the final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary bucket name
+     */
+    String getTemporaryBucketName(GSFileSystemOptions options) {
+        return options.writerTemporaryBucketName.isEmpty()
+                ? finalBlobId.getBucket()
+                : options.writerTemporaryBucketName;
+    }
+
+    /**
+     * Returns a temporary object partial name, i.e. .inprogress/foo/bar/ for 
the final blob with
+     * object name "foo/bar". The included trailing slash is deliberate, so 
that we can be sure that
+     * object names that start with this partial name are, in fact, temporary 
files associated with
+     * the upload of the associated final blob.
+     *
+     * @param options The GS file system options
+     * @return The temporary object partial name
+     */
+    String getTemporaryObjectPartialName(GSFileSystemOptions options) {
+        String finalObjectName = finalBlobId.getName();
+        return String.format("%s%s/", options.writerTemporaryObjectPrefix, 
finalObjectName);
+    }
+
+    /**
+     * Returns a temporary object name, formed by appending the compact string 
version of the
+     * temporary object id to the temporary object partial name, i.e.
+     * .inprogress/foo/bar/EjgelvANQ525hLUW2S6DBA for the final blob with 
object name "foo/bar".
+     *
+     * @param temporaryObjectId The temporary object id
+     * @param options The GS file system options
+     * @return The temporary object name
+     */
+    String getTemporaryObjectName(UUID temporaryObjectId, GSFileSystemOptions 
options) {
+        return getTemporaryObjectPartialName(options) + 
temporaryObjectId.toString();
+    }
+
+    /**
+     * Creates a temporary blob id for a provided temporary object id.
+     *
+     * @param temporaryObjectId The temporary object id
+     * @param options The GS file system options
+     * @return
+     */
+    private BlobId createTemporaryBlobId(UUID temporaryObjectId, 
GSFileSystemOptions options) {
+        String temporaryBucketName = getTemporaryBucketName(options);
+        String temporaryObjectName = getTemporaryObjectName(temporaryObjectId, 
options);
+        return BlobId.of(temporaryBucketName, temporaryObjectName);
+    }
+
+    /**
+     * Creates a new temporary blob id.
+     *
+     * @param options The GS file system options
+     * @return The new temporary blob id.
+     */
+    BlobId createTemporaryBlobId(GSFileSystemOptions options) {
+        UUID temporaryObjectId = UUID.randomUUID();
+        return createTemporaryBlobId(temporaryObjectId, options);
+    }

Review comment:
       We might consider making these methods utils. They have nothing to do 
with the responsibility of the state, apart from taking `finalBlobId` as an 
input.
   
   E.g., in `GSRecoverableWriterCommitter#commit`, it's a bit against intuition 
that the committer calls the state to generate an id for the intermediate blob.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to