xintongsong commented on a change in pull request #15599: URL: https://github.com/apache/flink/pull/15599#discussion_r641393793
########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + /** The scheme for the Google Storage file system. */ + public static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + private final HadoopConfigLoader hadoopConfigLoader; + + private Configuration flinkConfig; Review comment: ```suggestion @Nullable private Configuration flinkConfig; ``` ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.storage.GSBlobStorageImpl; +import org.apache.flink.fs.gs.writer.GSRecoverableWriter; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; + +import java.io.IOException; + +/** Provides recoverable-writer functionality for the standard GoogleHadoopFileSystem. */ +class GSFileSystem extends HadoopFileSystem { + + private final GSFileSystemOptions options; + + GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { + super(Preconditions.checkNotNull(googleHadoopFileSystem)); + this.options = Preconditions.checkNotNull(options); + } + + @Override + public RecoverableWriter createRecoverableWriter() throws IOException { Review comment: nit: ```suggestion public RecoverableWriter createRecoverableWriter() { ``` ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.util.HadoopConfigLoader; +import org.apache.flink.util.Preconditions; + +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +/** + * Implementation of the Flink {@link org.apache.flink.core.fs.FileSystemFactory} interface for + * Google Storage. + */ +public class GSFileSystemFactory implements FileSystemFactory { + + /** The scheme for the Google Storage file system. */ + public static final String SCHEME = "gs"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.gs."; + + private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", HADOOP_CONFIG_PREFIX}; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + + private static final String FLINK_SHADING_PREFIX = ""; + + private final HadoopConfigLoader hadoopConfigLoader; + + private Configuration flinkConfig; + + /** Constructs the Google Storage file system factory. */ + public GSFileSystemFactory() { + this.hadoopConfigLoader = + new HadoopConfigLoader( + FLINK_CONFIG_PREFIXES, + MIRRORED_CONFIG_KEYS, + HADOOP_CONFIG_PREFIX, + Collections.emptySet(), + Collections.emptySet(), + FLINK_SHADING_PREFIX); + } + + @Override + public void configure(Configuration flinkConfig) { + Preconditions.checkNotNull(flinkConfig); + + this.flinkConfig = flinkConfig; Review comment: nit: ```suggestion this.flinkConfig = Preconditions.checkNotNull(flinkConfig); ``` ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.GSBlobIdentifier; +import org.apache.flink.fs.gs.storage.GSBlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** The recoverable writer implementation for Google storage. */ +public class GSRecoverableWriter implements RecoverableWriter { + + /** The underlying blob storage. */ + private final GSBlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** + * Construct a GS recoverable writer. + * + * @param storage The underlying blob storage instance + * @param options The GS file system options + */ + public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions options) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + } + + @Override + public boolean requiresCleanupOfRecoverableState() { + // we can't clean up any state prior to commit + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + return false; + } + + @Override + public boolean supportsResume() { + return true; + } + + @Override + public RecoverableFsDataOutputStream open(Path path) throws IOException { + Preconditions.checkNotNull(path); + + GSBlobIdentifier finalBlobIdentifier = BlobUtils.parseUri(path.toUri()); + return new GSRecoverableFsDataOutputStream(storage, options, finalBlobIdentifier); + } + + @Override + public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException { + Preconditions.checkNotNull(resumable); + + GSResumeRecoverable recoverable = (GSResumeRecoverable) resumable; + return new GSRecoverableFsDataOutputStream(storage, options, recoverable); + } + + @Override + public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { Review comment: nit: ```suggestion public boolean cleanupRecoverableState(ResumeRecoverable resumable) { ``` ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/GSBlobStorage.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.storage; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** Abstract blob storage, used to simplify interface to Google storage and make it mockable. */ +public interface GSBlobStorage { + + /** + * Creates a write channel. + * + * @param blobIdentifier The blob identifier to which to write + * @return The WriteChannel helper + */ + WriteChannel writeBlob(GSBlobIdentifier blobIdentifier); + + /** + * Gets blob metadata. + * + * @param blobIdentifier The blob identifier + * @return The blob metadata, if the blob exists. Empty if the blob doesn't exist. + */ + Optional<BlobMetadata> getMetadata(GSBlobIdentifier blobIdentifier); + + /** + * Lists all the blobs in a bucket matching a given prefix. + * + * @param bucketName The bucket name + * @param prefix The object prefix + * @return The found blobs ids + */ + List<GSBlobIdentifier> list(String bucketName, String prefix); + + /** + * Copies from a source blob id to a target blob id. Does not delete the source blob. + * + * @param sourceBlobIdentifier The source blob identifier + * @param targetBlobIdentifier The target glob identifier + */ + void copy(GSBlobIdentifier sourceBlobIdentifier, GSBlobIdentifier targetBlobIdentifier); + + /** + * Composes multiple blobs into one. Does not delete any of the source blobs. + * + * @param sourceBlobIdentifiers The source blob identifiers to combine, max of 32 + * @param targetBlobIdentifier The target blob identifier + */ + void compose( + List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier); + + /** + * Deletes blobs. Note that this does not fail if blobs don't exist. + * + * @param blobIdentifiers The blob identifiers to delete + * @return The results of each delete operation. + */ + List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers); + + /** Abstract blob metadata. */ + interface BlobMetadata { + + /** + * The crc32 checksum for the blob. + * + * @return The checksum + */ + String getChecksum(); + } + + /** Abstract blob write channel. */ + interface WriteChannel { + + /** + * Sets the chunk size for upload. + * + * @param chunkSize The chunk size + */ + void setChunkSize(int chunkSize); Review comment: Is it safe to change the chunk size of a write channel, when there's already some data being written? I'm asking because, if it is not allowed to change the chunk size after starting to write data, we may consider to ensure that by setting the chunk size in `GSBlobStorageImpl#writeBlob` and not exposing this interface. I took a glance at the google cloud codes and does not find anything that forbids changing the chunk size afterwards. So there's probably no problem with the current implementation. Just want to confirm this with you. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.GSBlobIdentifier; +import org.apache.flink.fs.gs.storage.GSBlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** The recoverable writer implementation for Google storage. */ +public class GSRecoverableWriter implements RecoverableWriter { + + /** The underlying blob storage. */ + private final GSBlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** + * Construct a GS recoverable writer. + * + * @param storage The underlying blob storage instance + * @param options The GS file system options + */ + public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions options) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + } + + @Override + public boolean requiresCleanupOfRecoverableState() { + // we can't clean up any state prior to commit + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + return false; + } + + @Override + public boolean supportsResume() { + return true; + } + + @Override + public RecoverableFsDataOutputStream open(Path path) throws IOException { + Preconditions.checkNotNull(path); + + GSBlobIdentifier finalBlobIdentifier = BlobUtils.parseUri(path.toUri()); + return new GSRecoverableFsDataOutputStream(storage, options, finalBlobIdentifier); + } + + @Override + public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException { Review comment: nit: ```suggestion public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) { ``` ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.GSBlobIdentifier; +import org.apache.flink.fs.gs.storage.GSBlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + + /** The underlying blob storage. */ + private final GSBlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** The recoverable for the commit operation. */ + private final GSResumeRecoverable recoverable; + + GSRecoverableWriterCommitter( + GSBlobStorage storage, GSFileSystemOptions options, GSResumeRecoverable recoverable) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + this.recoverable = Preconditions.checkNotNull(recoverable); + } + + @Override + public void commit() throws IOException { + + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + // first, make sure the final blob doesn't already exist + Optional<GSBlobStorage.BlobMetadata> blobMetadata = + storage.getMetadata(recoverable.finalBlobIdentifier); + if (blobMetadata.isPresent()) { + throw new IOException( + String.format( + "Blob %s already exists during attempted commit", + recoverable.finalBlobIdentifier)); + } + + // write the final blob + writeFinalBlob(); + + // clean up after successful commit + cleanupTemporaryBlobs(); + } + + @Override + public void commitAfterRecovery() throws IOException { + + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + // only write the final blob if it doesn't already exist + Optional<GSBlobStorage.BlobMetadata> blobMetadata = + storage.getMetadata(recoverable.finalBlobIdentifier); + if (!blobMetadata.isPresent()) { + writeFinalBlob(); + } + + // clean up after successful commit + cleanupTemporaryBlobs(); + } + + @Override + public RecoverableWriter.CommitRecoverable getRecoverable() { + return recoverable; + } + + /** + * Helper to compose an arbitrary number of blobs into a final blob, staying under the + * COMPOSE_MAX_BLOBS limit for any individual compose operation. + * + * @param sourceBlobIdentifiers The source blob ids to compose + * @param targetBlobIdentifier The target blob id for the composed result + */ + private void composeBlobs( + List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier) { + Preconditions.checkNotNull(sourceBlobIdentifiers); + Preconditions.checkArgument(sourceBlobIdentifiers.size() > 0); + Preconditions.checkNotNull(targetBlobIdentifier); + + // split the source list into two parts; first, the ones we can compose in this operation + // (up to COMPOSE_MAX_BLOBS), and, second, whichever blobs are left over + final int composeToIndex = + Math.min(BlobUtils.COMPOSE_MAX_BLOBS, sourceBlobIdentifiers.size()); + List<GSBlobIdentifier> composeBlobIds = sourceBlobIdentifiers.subList(0, composeToIndex); + List<GSBlobIdentifier> remainingBlobIds = + sourceBlobIdentifiers.subList(composeToIndex, sourceBlobIdentifiers.size()); + + // determine the resulting blob id for this compose operation. if this is the last compose, + // i.e. if there are no remaining blob ids, then the composed blob id is the originally + // specified target blob id. otherwise, we must create an intermediate blob id to hold the + // result of this compose operation + GSBlobIdentifier composedBlobId = + remainingBlobIds.isEmpty() + ? targetBlobIdentifier + : BlobUtils.generateTemporaryBlobIdentifier( + recoverable.finalBlobIdentifier, options); + + // compose the blobs + storage.compose(composeBlobIds, composedBlobId); + + // if we have remaining blobs, add the composed blob id to the beginning of the list + // of remaining blob ids, and recurse + if (!remainingBlobIds.isEmpty()) { + remainingBlobIds.add(0, composedBlobId); + composeBlobs(remainingBlobIds, targetBlobIdentifier); + } + } Review comment: The major part of this method deals with the `COMPOSE_MAX_BLOBS`, which is a limitation comes from the GCS blob storage. Therefore, I'd suggest to move this logic to `GSBlobStorageImpl`. We can have a public `GSBlobStorageImpl#compose` that accepts any number of source blobs, and internally calls a private `GSBlobStorageImpl#composeInternal` that takes no more source blobs than the limitation. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.GSBlobIdentifier; +import org.apache.flink.fs.gs.storage.GSBlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + + /** The underlying blob storage. */ + private final GSBlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** The recoverable for the commit operation. */ + private final GSResumeRecoverable recoverable; + + GSRecoverableWriterCommitter( + GSBlobStorage storage, GSFileSystemOptions options, GSResumeRecoverable recoverable) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + this.recoverable = Preconditions.checkNotNull(recoverable); + } + + @Override + public void commit() throws IOException { + + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + // first, make sure the final blob doesn't already exist + Optional<GSBlobStorage.BlobMetadata> blobMetadata = + storage.getMetadata(recoverable.finalBlobIdentifier); + if (blobMetadata.isPresent()) { + throw new IOException( + String.format( + "Blob %s already exists during attempted commit", + recoverable.finalBlobIdentifier)); + } + + // write the final blob + writeFinalBlob(); + + // clean up after successful commit + cleanupTemporaryBlobs(); + } + + @Override + public void commitAfterRecovery() throws IOException { + + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + // only write the final blob if it doesn't already exist + Optional<GSBlobStorage.BlobMetadata> blobMetadata = + storage.getMetadata(recoverable.finalBlobIdentifier); + if (!blobMetadata.isPresent()) { + writeFinalBlob(); + } + + // clean up after successful commit + cleanupTemporaryBlobs(); + } + + @Override + public RecoverableWriter.CommitRecoverable getRecoverable() { + return recoverable; + } + + /** + * Helper to compose an arbitrary number of blobs into a final blob, staying under the + * COMPOSE_MAX_BLOBS limit for any individual compose operation. + * + * @param sourceBlobIdentifiers The source blob ids to compose + * @param targetBlobIdentifier The target blob id for the composed result + */ + private void composeBlobs( + List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier) { + Preconditions.checkNotNull(sourceBlobIdentifiers); + Preconditions.checkArgument(sourceBlobIdentifiers.size() > 0); + Preconditions.checkNotNull(targetBlobIdentifier); + + // split the source list into two parts; first, the ones we can compose in this operation + // (up to COMPOSE_MAX_BLOBS), and, second, whichever blobs are left over + final int composeToIndex = + Math.min(BlobUtils.COMPOSE_MAX_BLOBS, sourceBlobIdentifiers.size()); + List<GSBlobIdentifier> composeBlobIds = sourceBlobIdentifiers.subList(0, composeToIndex); + List<GSBlobIdentifier> remainingBlobIds = + sourceBlobIdentifiers.subList(composeToIndex, sourceBlobIdentifiers.size()); + + // determine the resulting blob id for this compose operation. if this is the last compose, + // i.e. if there are no remaining blob ids, then the composed blob id is the originally + // specified target blob id. otherwise, we must create an intermediate blob id to hold the + // result of this compose operation + GSBlobIdentifier composedBlobId = + remainingBlobIds.isEmpty() + ? targetBlobIdentifier + : BlobUtils.generateTemporaryBlobIdentifier( + recoverable.finalBlobIdentifier, options); + + // compose the blobs + storage.compose(composeBlobIds, composedBlobId); + + // if we have remaining blobs, add the composed blob id to the beginning of the list + // of remaining blob ids, and recurse + if (!remainingBlobIds.isEmpty()) { + remainingBlobIds.add(0, composedBlobId); + composeBlobs(remainingBlobIds, targetBlobIdentifier); + } + } + + /** + * Writes the final blob by composing the temporary blobs and copying, if necessary. + * + * @throws IOException On underlying failure. + */ + private void writeFinalBlob() throws IOException { + + // compose all the component blob ids into the final blob id. if the component blob ids are + // in the same bucket as the final blob id, this can be done directly. otherwise, we must + // compose to a new temporary blob id in the same bucket as the component blob ids and + // then copy that blob to the final blob location + String temporaryBucketName = + BlobUtils.getTemporaryBucketName(recoverable.finalBlobIdentifier, options); + if (recoverable.finalBlobIdentifier.bucketName.equals(temporaryBucketName)) { + + // compose directly to final blob + composeBlobs(recoverable.getComponentBlobIds(options), recoverable.finalBlobIdentifier); + + } else { + + // compose to the intermediate blob, then copy + GSBlobIdentifier intermediateBlobIdentifier = + BlobUtils.generateTemporaryBlobIdentifier( + recoverable.finalBlobIdentifier, options); + composeBlobs(recoverable.getComponentBlobIds(options), intermediateBlobIdentifier); + storage.copy(intermediateBlobIdentifier, recoverable.finalBlobIdentifier); + } + } + + /** + * Clean up after a successful commit operation, by deleting any temporary blobs associated with + * the final blob. + * + * @throws IOException On underlying storage failure + */ + private void cleanupTemporaryBlobs() throws IOException { Review comment: nit: ```suggestion private void cleanupTemporaryBlobs() { ``` ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSResumeRecoverable.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.GSBlobIdentifier; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** A resumable state for a recoverable output stream. */ +class GSResumeRecoverable implements RecoverableWriter.ResumeRecoverable { + + /** The blob id to which the recoverable write operation is writing. */ + public final GSBlobIdentifier finalBlobIdentifier; + + /** The write position, i.e. number of bytes that have been written so far. */ + public final long position; + + /** Indicates if the write has been closed. */ + public final boolean closed; + + /** The object ids for the temporary objects that should be composed to form the final blob. */ + public final UUID[] componentObjectIds; Review comment: This does not prevent modifications to the array elements. I'd suggest `UnmodifiableList`. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.GSBlobIdentifier; +import org.apache.flink.fs.gs.storage.GSBlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** The recoverable writer implementation for Google storage. */ +public class GSRecoverableWriter implements RecoverableWriter { + + /** The underlying blob storage. */ + private final GSBlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** + * Construct a GS recoverable writer. + * + * @param storage The underlying blob storage instance + * @param options The GS file system options + */ + public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions options) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + } + + @Override + public boolean requiresCleanupOfRecoverableState() { + // we can't clean up any state prior to commit + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + return false; + } + + @Override + public boolean supportsResume() { + return true; + } + + @Override + public RecoverableFsDataOutputStream open(Path path) throws IOException { + Preconditions.checkNotNull(path); + + GSBlobIdentifier finalBlobIdentifier = BlobUtils.parseUri(path.toUri()); + return new GSRecoverableFsDataOutputStream(storage, options, finalBlobIdentifier); + } + + @Override + public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException { + Preconditions.checkNotNull(resumable); + + GSResumeRecoverable recoverable = (GSResumeRecoverable) resumable; + return new GSRecoverableFsDataOutputStream(storage, options, recoverable); + } + + @Override + public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { + // we can't safely clean up any state prior to commit, so do nothing here + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + return true; + } + + @Override + public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) + throws IOException { Review comment: nit: ```suggestion public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) { ``` ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.gs.GSFileSystemOptions; +import org.apache.flink.fs.gs.storage.GSBlobIdentifier; +import org.apache.flink.fs.gs.storage.GSBlobStorage; +import org.apache.flink.fs.gs.utils.BlobUtils; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** The committer for the GS recoverable writer. */ +class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer { + + /** The underlying blob storage. */ + private final GSBlobStorage storage; + + /** The GS file system options. */ + private final GSFileSystemOptions options; + + /** The recoverable for the commit operation. */ + private final GSResumeRecoverable recoverable; + + GSRecoverableWriterCommitter( + GSBlobStorage storage, GSFileSystemOptions options, GSResumeRecoverable recoverable) { + this.storage = Preconditions.checkNotNull(storage); + this.options = Preconditions.checkNotNull(options); + this.recoverable = Preconditions.checkNotNull(recoverable); + } + + @Override + public void commit() throws IOException { + + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + // first, make sure the final blob doesn't already exist + Optional<GSBlobStorage.BlobMetadata> blobMetadata = + storage.getMetadata(recoverable.finalBlobIdentifier); + if (blobMetadata.isPresent()) { + throw new IOException( + String.format( + "Blob %s already exists during attempted commit", + recoverable.finalBlobIdentifier)); + } + + // write the final blob + writeFinalBlob(); + + // clean up after successful commit + cleanupTemporaryBlobs(); + } + + @Override + public void commitAfterRecovery() throws IOException { + + // see discussion: https://github.com/apache/flink/pull/15599#discussion_r623127365 + // only write the final blob if it doesn't already exist + Optional<GSBlobStorage.BlobMetadata> blobMetadata = + storage.getMetadata(recoverable.finalBlobIdentifier); + if (!blobMetadata.isPresent()) { + writeFinalBlob(); + } + + // clean up after successful commit + cleanupTemporaryBlobs(); + } + + @Override + public RecoverableWriter.CommitRecoverable getRecoverable() { + return recoverable; + } + + /** + * Helper to compose an arbitrary number of blobs into a final blob, staying under the + * COMPOSE_MAX_BLOBS limit for any individual compose operation. + * + * @param sourceBlobIdentifiers The source blob ids to compose + * @param targetBlobIdentifier The target blob id for the composed result + */ + private void composeBlobs( + List<GSBlobIdentifier> sourceBlobIdentifiers, GSBlobIdentifier targetBlobIdentifier) { + Preconditions.checkNotNull(sourceBlobIdentifiers); + Preconditions.checkArgument(sourceBlobIdentifiers.size() > 0); + Preconditions.checkNotNull(targetBlobIdentifier); + + // split the source list into two parts; first, the ones we can compose in this operation + // (up to COMPOSE_MAX_BLOBS), and, second, whichever blobs are left over + final int composeToIndex = + Math.min(BlobUtils.COMPOSE_MAX_BLOBS, sourceBlobIdentifiers.size()); + List<GSBlobIdentifier> composeBlobIds = sourceBlobIdentifiers.subList(0, composeToIndex); + List<GSBlobIdentifier> remainingBlobIds = + sourceBlobIdentifiers.subList(composeToIndex, sourceBlobIdentifiers.size()); + + // determine the resulting blob id for this compose operation. if this is the last compose, + // i.e. if there are no remaining blob ids, then the composed blob id is the originally + // specified target blob id. otherwise, we must create an intermediate blob id to hold the + // result of this compose operation + GSBlobIdentifier composedBlobId = + remainingBlobIds.isEmpty() + ? targetBlobIdentifier + : BlobUtils.generateTemporaryBlobIdentifier( + recoverable.finalBlobIdentifier, options); + + // compose the blobs + storage.compose(composeBlobIds, composedBlobId); + + // if we have remaining blobs, add the composed blob id to the beginning of the list + // of remaining blob ids, and recurse + if (!remainingBlobIds.isEmpty()) { + remainingBlobIds.add(0, composedBlobId); + composeBlobs(remainingBlobIds, targetBlobIdentifier); + } + } + + /** + * Writes the final blob by composing the temporary blobs and copying, if necessary. + * + * @throws IOException On underlying failure. + */ + private void writeFinalBlob() throws IOException { Review comment: nit: ```suggestion private void writeFinalBlob() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
