chamikaramj commented on a change in pull request #15355:
URL: https://github.com/apache/beam/pull/15355#discussion_r707449144



##########
File path: 
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -458,38 +458,89 @@ SeekableByteChannel open(GcsPath path, 
GoogleCloudStorageReadOptions readOptions
         new StorageResourceId(path.getBucket(), path.getObject()), 
readOptions);
   }
 
-  /**
-   * Creates an object in GCS.
-   *
-   * <p>Returns a WritableByteChannel that can be used to write data to the 
object.
-   *
-   * @param path the GCS file to write to
-   * @param type the type of object, eg "text/plain".
-   * @return a Callable object that encloses the operation.
-   */
+  /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
+  @Deprecated
   public WritableByteChannel create(GcsPath path, String type) throws 
IOException {
-    return create(path, type, uploadBufferSizeBytes);
+    CreateOptions.Builder builder = 
CreateOptions.builder().setContentType(type);
+    return create(path, builder.build());
   }
 
-  /**
-   * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding 
{code
-   * uploadBufferSizeBytes}.
-   */
+  /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
+  @Deprecated
   public WritableByteChannel create(GcsPath path, String type, Integer 
uploadBufferSizeBytes)
       throws IOException {
+    CreateOptions.Builder builder =
+        CreateOptions.builder()
+            .setContentType(type)
+            .setUploadBufferSizeBytes(uploadBufferSizeBytes);
+    return create(path, builder.build());
+  }
+
+  @AutoValue
+  public abstract static class CreateOptions {
+    /**
+     * If true, the created file is expected to not exist. Instead of checking 
for file presence
+     * before writing a write exception may occur if the file does exist.
+     */
+    public abstract boolean getExpectFileToNotExist();
+
+    /**
+     * If non-null, the upload buffer size to be used. If null, the buffer 
size corresponds to {code
+     * GCSUtil.getUploadBufferSizeBytes}
+     */
+    public abstract @Nullable Integer getUploadBufferSizeBytes();
+
+    /** The content type for the created file, eg "text/plain". */
+    public abstract @Nullable String getContentType();
+
+    public static Builder builder() {
+      return new 
AutoValue_GcsUtil_CreateOptions.Builder().setExpectFileToNotExist(false);
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setContentType(String value);
+
+      public abstract Builder setUploadBufferSizeBytes(int value);
+
+      public abstract Builder setExpectFileToNotExist(boolean value);
+
+      public abstract CreateOptions build();
+    }
+  }
+  /**
+   * Creates an object in GCS and prepares for uploading its contents.
+   *
+   * @param path the GCS file to write to
+   * @param options to be used for creating and configuring file upload
+   * @return a WritableByteChannel that can be used to write data to the 
object.
+   */
+  public WritableByteChannel create(GcsPath path, CreateOptions options) 
throws IOException {
     AsyncWriteChannelOptions wcOptions = 
googleCloudStorageOptions.getWriteChannelOptions();
-    int uploadChunkSize =
-        (uploadBufferSizeBytes == null) ? wcOptions.getUploadChunkSize() : 
uploadBufferSizeBytes;
-    AsyncWriteChannelOptions newOptions =
-        wcOptions.toBuilder().setUploadChunkSize(uploadChunkSize).build();
+    @Nullable
+    Integer uploadBufferSizeBytes =
+        options.getUploadBufferSizeBytes() != null
+            ? options.getUploadBufferSizeBytes()
+            : getUploadBufferSizeBytes();
+    if (uploadBufferSizeBytes != null) {
+      wcOptions = 
wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build();
+    }
     GoogleCloudStorageOptions newGoogleCloudStorageOptions =
-        
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(newOptions).build();
+        
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
     GoogleCloudStorage gcpStorage =
         new GoogleCloudStorageImpl(
             newGoogleCloudStorageOptions, this.storageClient, 
this.credentials);
-    return gcpStorage.create(
-        new StorageResourceId(path.getBucket(), path.getObject()),
-        
CreateObjectOptions.builder().setOverwriteExisting(true).setContentType(type).build());
+    StorageResourceId resourceId =
+        new StorageResourceId(
+            path.getBucket(),
+            path.getObject(),
+            options.getExpectFileToNotExist() ? 0L : 
StorageResourceId.UNKNOWN_GENERATION_ID);

Review comment:
       It's not clear to me from the documentation how changing the generation 
ID to 0L helps.
   
https://www.javadoc.io/doc/com.google.cloud.bigdataoss/gcsio/1.9.1/com/google/cloud/hadoop/gcsio/StorageResourceId.html
   
   Could you clarify here with a comment ?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
##########
@@ -948,7 +950,12 @@ public final void open(String uId) throws Exception {
           getWriteOperation().getSink().writableByteChannelFactory;
       // The factory may force a MIME type or it may return null, indicating 
to use the sink's MIME.
       String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
-      WritableByteChannel tempChannel = FileSystems.create(outputFile, 
channelMimeType);
+      CreateOptions createOptions =
+          StandardCreateOptions.builder()
+              .setMimeType(channelMimeType)
+              .setExpectFileToNotExist(true)

Review comment:
       Please explain with a comment why this would be true for all file 
systems.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
##########
@@ -948,7 +950,12 @@ public final void open(String uId) throws Exception {
           getWriteOperation().getSink().writableByteChannelFactory;
       // The factory may force a MIME type or it may return null, indicating 
to use the sink's MIME.
       String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
-      WritableByteChannel tempChannel = FileSystems.create(outputFile, 
channelMimeType);
+      CreateOptions createOptions =
+          StandardCreateOptions.builder()
+              .setMimeType(channelMimeType)
+              .setExpectFileToNotExist(true)
+              .build();
+      WritableByteChannel tempChannel = FileSystems.create(outputFile, 
createOptions);

Review comment:
       Will this still be safe for bundle retries ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to