xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r619777399



##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/BlobUtils.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.utils;
+
+import com.google.cloud.storage.BlobId;
+
+import java.net.URI;
+
+/** Utility functions related to blobs. */
+public class BlobUtils {
+
+    /** The maximum number of blobs that can be composed in a single 
operation. */
+    public static final int COMPOSE_MAX_BLOBS = 32;
+
+    /**
+     * Normalizes a blob id, ensuring that the generation is null.
+     *
+     * @param blobId The blob id
+     * @return The blob id with the generation set to null
+     */
+    public static BlobId normalizeBlobId(BlobId blobId) {
+        return BlobId.of(blobId.getBucket(), blobId.getName());
+    }

Review comment:
       Thanks for the explanation.
   
   I see the point that we want to compare the blob ids with generation 
ignored. This is why I mentioned in another comment, that it might make sense 
to introduce a class wrapping `BlobId`. Otherwise, we would have to carefully 
deal with when and where to call this util method.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/storage/BlobStorage.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.storage;
+
+import com.google.cloud.storage.BlobId;

Review comment:
       I don't think it's necessary to abstract away everything 
google-specific. TBH, I don't see much chance this interface gets reused by 
other blob storages. Having this interface purely for testability sounds good 
to me. That's why I listed the option renaming the interface to `GSBlobStorage`.
   
   The reason I suggested to wrap `BlobId` anyway, is not for making this 
interface general and reusable. It's because I see a gap between the 
information `BlobId` carries and what we need. The complexity of having to 
understand when and where to do the normalization is one of the consequences of 
such gap. There could be others, e.g., if more fields are introduced to 
`BlobId` in future gcs versions.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+    private static final String SCHEME = "gs";
+
+    private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+    private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+    private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+    private static final String FLINK_SHADING_PREFIX = "";
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_BUCKET_NAME =
+            ConfigOptions.key("gs.writer.temporary.bucket.name")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+                    .withDescription(
+                            "This option sets the bucket name used by the 
recoverable writer to store temporary files. "
+                                    + "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+    public static final ConfigOption<String> WRITER_TEMPORARY_OBJECT_PREFIX =
+            ConfigOptions.key("gs.writer.temporary.object.prefix")
+                    .stringType()
+                    
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+                    .withDescription(
+                            "This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
+                                    + "final object name to form the base name 
for temporary files.");

Review comment:
       Thanks for the explanation. I agree with you that being able to specify 
different TTLs for temporal/final objects can be useful, which makes a good 
reason for making the temporary bucket configurable.
   
   What about the temporal object prefix? Is it necessary to make that 
configurable as well?

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableFsDataOutputStream.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** The data output stream implementation for the GS recoverable writer. */
+class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state. */
+    private final GSRecoverableWriterState state;
+
+    /**
+     * The current write channel, if one exists. A channel is created when one 
doesn't exist and
+     * bytes are written, and the channel is closed/destroyed when explicitly 
closed by the consumer
+     * (via close or closeForCommit) or when the data output stream is 
persisted (via persist).
+     * Calling persist does not close the data output stream, so it's possible 
that more bytes will
+     * be written, which will cause another channel to be created. So, 
multiple write channels may
+     * be created and destroyed during the lifetime of the data output stream.
+     */
+    @Nullable GSChecksumWriteChannel currentWriteChannel;
+
+    GSRecoverableFsDataOutputStream(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return state.bytesWritten;
+    }
+
+    @Override
+    public void write(int byteValue) throws IOException {
+        byte[] bytes = new byte[] {(byte) byteValue};
+        write(bytes);
+    }
+
+    @Override
+    public void write(@Nonnull byte[] content) throws IOException {
+        Preconditions.checkNotNull(content);
+
+        write(content, 0, content.length);
+    }
+
+    @Override
+    public void write(@Nonnull byte[] content, int start, int length) throws 
IOException {
+        Preconditions.checkNotNull(content);
+        Preconditions.checkArgument(start >= 0);
+        Preconditions.checkArgument(length >= 0);
+
+        // if the data stream is already closed, throw an exception
+        if (state.closed) {
+            throw new IOException("Illegal attempt to write to closed output 
stream");
+        }
+
+        // if necessary, create a write channel
+        if (currentWriteChannel == null) {
+            currentWriteChannel = createWriteChannel();
+        }
+
+        // write to the stream. the docs say that, in some circumstances, 
though an attempt will be
+        // made to write all of the requested bytes, there are some cases 
where only some bytes will
+        // be written. it's not clear whether this could ever happen with a 
Google storage
+        // WriteChannel; in any case, recoverable writers don't support 
partial writes, so if this
+        // ever happens, we must fail the write.:
+        // 
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/WritableByteChannel.html#write(java.nio.ByteBuffer)
+        int bytesWritten = currentWriteChannel.write(content, start, length);
+        if (bytesWritten != length) {
+            throw new IOException(
+                    String.format(
+                            "WriteChannel.write wrote %d of %d requested 
bytes, failing.",
+                            bytesWritten, length));
+        }
+
+        // update count of total bytes written
+        state.bytesWritten += length;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        // not supported for GS, flushing frequency is controlled by the chunk 
size setting
+        // 
https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-
+    }
+
+    @Override
+    public void sync() throws IOException {
+        // not supported for GS, flushing frequency is controlled by the chunk 
size setting
+        // 
https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#setChunkSize-int-
+    }

Review comment:
       I'd be ok with either way. It's up to you.
   - It would surely be nice if we can have these methods properly implemented, 
if that does not add too much workload for you.
   - I'd also be fine with leaving them unsupported for and implement them 
later when they are needed. In that case, we need to well document that the 
methods are not implemented only because they are not used, rather than any 
limitations from GCS.
   

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+    /** The underlying blob storage. */
+    private final BlobStorage storage;
+
+    /** The GS file system options. */
+    private final GSFileSystemOptions options;
+
+    /** The recoverable writer instance. */
+    private final GSRecoverableWriter writer;
+
+    /** The recoverable writer state for the commit operation. */
+    private final GSRecoverableWriterState state;
+
+    GSRecoverableWriterCommitter(
+            BlobStorage storage,
+            GSFileSystemOptions options,
+            GSRecoverableWriter writer,
+            GSRecoverableWriterState state) {
+        this.storage = Preconditions.checkNotNull(storage);
+        this.options = Preconditions.checkNotNull(options);
+        this.writer = Preconditions.checkNotNull(writer);
+        this.state = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public void commit() throws IOException {
+
+        // compose all the component blob ids into the final blob id. if the 
component blob ids are
+        // in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+        // compose to a new temporary blob id in the same bucket as the 
component blob ids and
+        // then copy that blob to the final blob location
+        if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+            // compose directly to final blob
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    state.finalBlobId,
+                    options.writerContentType);
+
+        } else {
+
+            // compose to a temporary blob id, then copy to final blob id
+            BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+            composeBlobs(
+                    state.getComponentBlobIds(options),
+                    intermediateBlobId,
+                    options.writerContentType);
+            storage.copy(intermediateBlobId, state.finalBlobId);
+        }
+
+        // clean up after commit
+        writer.cleanupRecoverableState(state);

Review comment:
       I think `cleanupRecoverableState` is not called immediately on blobs 
committed, but should rather be called on the next success checkpoint.
   I've talked to Guowei offline. He is also a Flink committer and one of the 
main contributor of streaming file sink. According to him, there are some known 
issues that `cleanupRecoverableState` may not always get called, e.g., the next 
success checkpoint never happen. Efforts for fixing those issues are still in 
progress.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to