This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7235088c490 GCS client library migration in Java SDK - part 3 (#37900)
7235088c490 is described below

commit 7235088c490112f3bf1f38da770cc7338450b8e9
Author: Shunping Huang <[email protected]>
AuthorDate: Wed May 6 12:33:52 2026 -0400

    GCS client library migration in Java SDK - part 3 (#37900)
    
    * Implement open method for gcsutilv2. Add an integration test.
    
    * Implement create method and add an integration test.
    
    * Store the gcs path into GcsWritableByteChannel.
    
    * Rename the new create method to createV2.
    
    * Revise according to reviewer comments.
---
 .../beam/sdk/extensions/gcp/util/GcsUtil.java      |  23 +++
 .../beam/sdk/extensions/gcp/util/GcsUtilV2.java    | 163 ++++++++++++++++++++-
 .../gcp/util/GcsUtilParameterizedIT.java           | 116 +++++++++++++--
 3 files changed, 291 insertions(+), 11 deletions(-)

diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index e3f01dd8529..ed727d495cf 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -25,6 +25,8 @@ import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BucketInfo;
 import com.google.cloud.storage.Storage.BlobGetOption;
 import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.BlobSourceOption;
+import com.google.cloud.storage.Storage.BlobWriteOption;
 import com.google.cloud.storage.Storage.BucketGetOption;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
@@ -186,9 +188,19 @@ public class GcsUtil {
   }
 
   public SeekableByteChannel open(GcsPath path) throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.open(path);
+    }
     return delegate.open(path);
   }
 
+  public SeekableByteChannel openV2(GcsPath path, BlobSourceOption... options) 
throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.open(path, options);
+    }
+    throw new IOException("GcsUtil V2 not initialized.");
+  }
+
   /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
   @Deprecated
   public WritableByteChannel create(GcsPath path, String type) throws 
IOException {
@@ -254,9 +266,20 @@ public class GcsUtil {
   }
 
   public WritableByteChannel create(GcsPath path, CreateOptions options) 
throws IOException {
+    if (delegateV2 != null) {
+      delegateV2.create(path, options.delegate);
+    }
     return delegate.create(path, options.delegate);
   }
 
+  public WritableByteChannel createV2(
+      GcsPath path, CreateOptions options, BlobWriteOption... writeOptions) 
throws IOException {
+    if (delegateV2 != null) {
+      return delegateV2.create(path, options.delegate, writeOptions);
+    }
+    throw new IOException("GcsUtil V2 not initialized.");
+  }
+
   public void verifyBucketAccessible(GcsPath path) throws IOException {
     if (delegateV2 != null) {
       delegateV2.verifyBucketAccessible(path);
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
index b00b7ce0d72..9119dd79652 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java
@@ -23,6 +23,8 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 
 import com.google.api.gax.paging.Page;
 import com.google.auto.value.AutoValue;
+import com.google.cloud.ReadChannel;
+import com.google.cloud.WriteChannel;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.BlobInfo;
@@ -33,21 +35,29 @@ import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.Storage.BlobField;
 import com.google.cloud.storage.Storage.BlobGetOption;
 import com.google.cloud.storage.Storage.BlobListOption;
+import com.google.cloud.storage.Storage.BlobSourceOption;
+import com.google.cloud.storage.Storage.BlobWriteOption;
 import com.google.cloud.storage.Storage.BucketField;
 import com.google.cloud.storage.Storage.BucketGetOption;
 import com.google.cloud.storage.Storage.CopyRequest;
 import com.google.cloud.storage.StorageBatch;
 import com.google.cloud.storage.StorageBatchResult;
+import com.google.cloud.storage.StorageChannelUtils;
 import com.google.cloud.storage.StorageException;
 import com.google.cloud.storage.StorageOptions;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.FileAlreadyExistsException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -70,6 +80,8 @@ class GcsUtilV2 {
 
   private Storage storage;
 
+  private final @Nullable Integer uploadBufferSizeBytes;
+
   /** Maximum number of items to retrieve per Objects.List request. */
   private static final long MAX_LIST_BLOBS_PER_CALL = 1024;
 
@@ -85,13 +97,14 @@ class GcsUtilV2 {
   GcsUtilV2(PipelineOptions options) {
     String projectId = options.as(GcpOptions.class).getProject();
     storage = 
StorageOptions.newBuilder().setProjectId(projectId).build().getService();
+    uploadBufferSizeBytes = 
options.as(GcsOptions.class).getGcsUploadBufferSizeBytes();
   }
 
   @SuppressWarnings({
     "nullness" // For Creating AccessDeniedException FileNotFoundException, and
     // FileAlreadyExistsException with null.
   })
-  private IOException translateStorageException(GcsPath gcsPath, 
StorageException e) {
+  private static IOException translateStorageException(GcsPath gcsPath, 
StorageException e) {
     switch (e.getCode()) {
       case 403:
         return new AccessDeniedException(gcsPath.toString(), null, 
e.getMessage());
@@ -481,4 +494,152 @@ class GcsUtilV2 {
       throw translateStorageException(bucketInfo.getName(), null, e);
     }
   }
+
+  /** A bridge that allows a GCS ReadChannel to behave as a 
SeekableByteChannel. */
+  private static class GcsSeekableByteChannel implements SeekableByteChannel {
+    private final ReadChannel reader;
+    private final long size;
+    private long position = 0;
+
+    GcsSeekableByteChannel(ReadChannel reader, long size) {
+      this.reader = reader;
+      this.size = size;
+      this.position = 0;
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+      int count = StorageChannelUtils.blockingFillFrom(dst, reader);
+      if (count > 0) {
+        this.position += count;
+      }
+      return count;
+    }
+
+    @Override
+    public SeekableByteChannel position(long newPosition) throws IOException {
+      checkArgument(newPosition >= 0, "Position must be non-negative: %s", 
newPosition);
+      reader.seek(newPosition);
+      this.position = newPosition;
+      return this;
+    }
+
+    @Override
+    public long position() throws IOException {
+      return this.position;
+    }
+
+    @Override
+    public long size() throws IOException {
+      return size;
+    }
+
+    @Override
+    public SeekableByteChannel truncate(long size) throws IOException {
+      throw new UnsupportedOperationException(
+          "GcsSeekableByteChannels are read-only and cannot be truncated.");
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      throw new UnsupportedOperationException(
+          "GcsSeekableByteChannel are read-only and does not support 
writing.");
+    }
+
+    @Override
+    public boolean isOpen() {
+      return reader.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (isOpen()) {
+        reader.close();
+      }
+    }
+  }
+
+  public SeekableByteChannel open(GcsPath path, BlobSourceOption... 
sourceOptions)
+      throws IOException {
+    Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE));
+    ReadChannel reader = blob.getStorage().reader(blob.getBlobId(), 
sourceOptions);
+    // disable internal buffering, and make the channel non-blocking
+    reader.setChunkSize(0);
+    return new GcsSeekableByteChannel(reader, blob.getSize());
+  }
+
+  /** A bridge that allows a GCS WriteChannel to behave as a 
WritableByteChannel. */
+  private static class GcsWritableByteChannel implements WritableByteChannel {
+    private final WriteChannel writer;
+    private final GcsPath gcsPath;
+
+    GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) {
+      this.writer = writer;
+      this.gcsPath = gcsPath;
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      try {
+        return writer.write(src);
+      } catch (StorageException e) {
+        throw translateStorageException(gcsPath, e);
+      }
+    }
+
+    @Override
+    public boolean isOpen() {
+      return writer.isOpen();
+    }
+
+    @Override
+    public void close() throws IOException {
+      writer.close();
+    }
+  }
+
+  public WritableByteChannel create(
+      GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption... 
writeOptions)
+      throws IOException {
+    try {
+      // Define the metadata for the new object
+      BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(), 
path.getObject());
+      String type = options.getContentType();
+      if (type != null) {
+        builder.setContentType(type);
+      }
+
+      BlobInfo blobInfo = builder.build();
+
+      List<BlobWriteOption> writeOptionList = new 
ArrayList<>(Arrays.asList(writeOptions));
+      if (options.getExpectFileToNotExist()) {
+        writeOptionList.add(BlobWriteOption.doesNotExist());
+      } else {
+        // We do not merge this check with the getExpectFileToNotExist() 
branch above
+        // because we don't want to always make the storage.get() RPC call.
+        Blob blob = storage.get(path.getBucket(), path.getObject());
+        if (blob == null) {
+          writeOptionList.add(BlobWriteOption.doesNotExist());
+        } else {
+          
writeOptionList.add(BlobWriteOption.generationMatch(blob.getGeneration()));
+        }
+      }
+      // Open a WriteChannel from the storage service
+      WriteChannel writer =
+          storage.writer(blobInfo, writeOptionList.toArray(new 
BlobWriteOption[0]));
+      Integer uploadBufferSizeBytes =
+          options.getUploadBufferSizeBytes() != null
+              ? options.getUploadBufferSizeBytes()
+              : this.uploadBufferSizeBytes;
+      if (uploadBufferSizeBytes != null) {
+        writer.setChunkSize(uploadBufferSizeBytes);
+      }
+
+      // Return the bridge wrapper
+      return new GcsWritableByteChannel(writer, path);
+
+    } catch (StorageException e) {
+      throw translateStorageException(path, e);
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
index 80ffd72924f..5759bb10a65 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.gcp.util;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
@@ -28,15 +29,25 @@ import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.StorageChannelUtils;
+import java.io.ByteArrayOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.FileAlreadyExistsException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
@@ -301,7 +312,8 @@ public class GcsUtilParameterizedIT {
     }
   }
 
-  private List<GcsPath> createTestBucketHelper(String bucketName) throws 
IOException {
+  private List<GcsPath> createTestBucketHelper(String bucketName, boolean 
copyData)
+      throws IOException {
     final List<GcsPath> originPaths =
         Arrays.asList(
             
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"),
@@ -316,16 +328,24 @@ public class GcsUtilParameterizedIT {
     if (experiment.equals("use_gcsutil_v2")) {
       gcsUtil.createBucket(BucketInfo.of(bucketName));
 
-      gcsUtil.copyV2(originPaths, testPaths);
+      if (copyData) {
+        gcsUtil.copyV2(originPaths, testPaths);
+      } else {
+        return Collections.emptyList();
+      }
     } else {
       GcsOptions gcsOptions = options.as(GcsOptions.class);
       gcsUtil.createBucket(gcsOptions.getProject(), new 
Bucket().setName(bucketName));
 
-      final List<String> originList =
-          originPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
-      final List<String> testList =
-          testPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
-      gcsUtil.copy(originList, testList);
+      if (copyData) {
+        final List<String> originList =
+            originPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+        final List<String> testList =
+            testPaths.stream().map(o -> 
o.toString()).collect(Collectors.toList());
+        gcsUtil.copy(originList, testList);
+      } else {
+        return Collections.emptyList();
+      }
     }
 
     return testPaths;
@@ -355,7 +375,7 @@ public class GcsUtilParameterizedIT {
     final String nonExistentBucket = "my-random-test-bucket-12345";
 
     try {
-      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket, 
true);
       final List<GcsPath> dstPaths =
           srcPaths.stream()
               .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + 
".bak"))
@@ -423,7 +443,7 @@ public class GcsUtilParameterizedIT {
     final String nonExistentBucket = "my-random-test-bucket-12345";
 
     try {
-      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket, 
true);
       final List<GcsPath> errPaths =
           srcPaths.stream()
               .map(o -> GcsPath.fromComponents(nonExistentBucket, 
o.getObject()))
@@ -485,7 +505,7 @@ public class GcsUtilParameterizedIT {
     final String nonExistentBucket = "my-random-test-bucket-12345";
 
     try {
-      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket);
+      final List<GcsPath> srcPaths = createTestBucketHelper(existingBucket, 
true);
       final List<GcsPath> tmpPaths =
           srcPaths.stream()
               .map(o -> GcsPath.fromComponents(existingBucket, "tmp/" + 
o.getObject()))
@@ -587,4 +607,80 @@ public class GcsUtilParameterizedIT {
       assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path));
     }
   }
+
+  String computeHash(ByteBuffer buffer) throws NoSuchAlgorithmException {
+    MessageDigest digest = MessageDigest.getInstance("SHA-256");
+    digest.update(buffer);
+    byte[] hashBytes = digest.digest();
+
+    // Convert bytes to Hex String
+    StringBuilder sb = new StringBuilder();
+    for (byte b : hashBytes) {
+      sb.append(String.format("%02x", b));
+    }
+    return sb.toString();
+  }
+
+  @Test
+  public void testRead() throws IOException, NoSuchAlgorithmException {
+    final GcsPath gcsPath = 
GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kinglear.txt");
+    final String expectedHash = 
"674a2725884307c96398440497c889ad8cecccedf5689df85e6b0faabe4e0fe8";
+    final long expectedSize = 157283L;
+
+    try (SeekableByteChannel channel = gcsUtil.open(gcsPath)) {
+      // Verify Size
+      assertEquals(expectedSize, channel.size());
+      assertEquals(0, channel.position());
+
+      // Read content into ByteBuffer.
+      // Allocate a larger buffer to ensure we receive the EOF at the expected 
place.
+      ByteBuffer buffer = ByteBuffer.allocate((int) expectedSize + 1024);
+      int bytesRead = StorageChannelUtils.blockingFillFrom(buffer, channel);
+
+      // Verify total bytes read and position
+      assertEquals(expectedSize, bytesRead);
+      assertEquals(expectedSize, channel.position());
+
+      // Flip the buffer to prepare it for reading (sets limit to current 
position, position to 0)
+      buffer.flip();
+
+      // Verify hash
+      String actualHash = computeHash(buffer);
+      assertEquals("Content hash should match", expectedHash, actualHash);
+    }
+  }
+
+  @Test
+  public void testWriteAndRead() throws IOException {
+    final String bucketName = "apache-beam-temp-bucket-12345";
+    final GcsPath targetPath =
+        GcsPath.fromComponents(bucketName, "test-object-" + 
java.util.UUID.randomUUID() + ".txt");
+    final byte[] content = "Hello, GCS!".getBytes(StandardCharsets.UTF_8);
+
+    try {
+      createTestBucketHelper(bucketName, false);
+
+      // Write content to a GCS file
+      CreateOptions options = 
CreateOptions.builder().setExpectFileToNotExist(true).build();
+      try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) {
+        writer.write(ByteBuffer.wrap(content));
+      }
+
+      // Read content into a buffer
+      ByteArrayOutputStream readContent = new ByteArrayOutputStream();
+      try (ReadableByteChannel reader = gcsUtil.open(targetPath)) {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        while (reader.read(buffer) != -1) {
+          buffer.flip();
+          readContent.write(buffer.array(), 0, buffer.limit());
+          buffer.clear();
+        }
+      }
+
+      // Verify content
+      assertArrayEquals(content, readContent.toByteArray());
+    } finally {
+      tearDownTestBucketHelper(bucketName);
+    }
+  }
 }

Reply via email to