This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 72af70d2198 Add a test to read and write GCS with gRPC enabled. 
(#30352)
72af70d2198 is described below

commit 72af70d219892d635b39fd5da65f4c2522624c35
Author: Shunping Huang <shunp...@google.com>
AuthorDate: Thu Feb 22 10:17:19 2024 -0500

    Add a test to read and write GCS with gRPC enabled. (#30352)
    
    * Add a test to read and write GCS with gRPC enabled.
    
    * Add an empty file to trigger post commit java test
    
    * Make the buckets used in the test configurable.
---
 .../google-cloud-platform-core/build.gradle        |  2 +
 .../beam/sdk/extensions/gcp/util/GcsUtilIT.java    | 87 ++++++++++++++++++++++
 2 files changed, 89 insertions(+)

diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle 
b/sdks/java/extensions/google-cloud-platform-core/build.gradle
index 542c641c855..d4dfd46f745 100644
--- a/sdks/java/extensions/google-cloud-platform-core/build.gradle
+++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle
@@ -66,10 +66,12 @@ task integrationTestKms(type: Test) {
   group = "Verification"
   def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
   def gcpTempRoot = project.findProperty('gcpTempRootKms') ?: 
'gs://temp-storage-for-end-to-end-tests-cmek'
+  def gcpGrpcTempRoot = project.findProperty('gcpGrpcTempRoot') ?: 
'gs://gcs-grpc-team-apache-beam-testing'
   def dataflowKmsKey = project.findProperty('dataflowKmsKey') ?: 
"projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test"
   systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
           "--project=${gcpProject}",
           "--tempRoot=${gcpTempRoot}",
+          "--grpcTempRoot=${gcpGrpcTempRoot}",
           "--dataflowKmsKey=${dataflowKmsKey}",
   ])
 
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java
index bc9853beca4..6f1e0e985c2 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java
@@ -19,17 +19,30 @@ package org.apache.beam.sdk.extensions.gcp.util;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 
+import com.google.protobuf.ByteString;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
 import java.util.Date;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.testing.UsesKms;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -76,4 +89,78 @@ public class GcsUtilIT {
 
     gcsUtil.remove(Lists.newArrayList(dstFilename));
   }
+
+  // TODO: once the gRPC feature is in public GA, we will have to refactor 
this test.
+  // As gRPC will be automatically enabled in each bucket by then, we will no 
longer need to check
+  // the failed case. The interface of GcsGrpcOptions can also be removed.
+  @Test
+  public void testWriteAndReadGcsWithGrpc() throws IOException {
+    final String outputPattern =
+        "%s/GcsUtilIT-%tF-%<tH-%<tM-%<tS-%<tL.testWriteAndReadGcsWithGrpc.txt";
+    final String testContent = "This is a test string.";
+
+    PipelineOptionsFactory.register(GcsGrpcOptions.class);
+
+    TestPipelineOptions options =
+        TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+
+    // set the experimental flag to enable grpc
+    ExperimentalOptions experimental = options.as(ExperimentalOptions.class);
+    experimental.setExperiments(Collections.singletonList("use_grpc_for_gcs"));
+
+    GcsOptions gcsOptions = options.as(GcsOptions.class);
+    GcsUtil gcsUtil = gcsOptions.getGcsUtil();
+    assertNotNull(gcsUtil);
+
+    // Write a test file in a bucket without gRPC enabled.
+    // This assumes that GCS gRPC feature is not enabled in every bucket by 
default.
+    assertNotNull(options.getTempRoot());
+    String tempLocationWithoutGrpc = options.getTempRoot() + "/temp";
+    String wrongFilename = String.format(outputPattern, 
tempLocationWithoutGrpc, new Date());
+    assertThrows(IOException.class, () -> writeGcsTextFile(gcsUtil, 
wrongFilename, testContent));
+
+    // Write a test file in a bucket with gRPC enabled.
+    GcsGrpcOptions grpcOptions = options.as(GcsGrpcOptions.class);
+    assertNotNull(grpcOptions.getGrpcTempRoot());
+    String tempLocationWithGrpc = grpcOptions.getGrpcTempRoot() + "/temp";
+    String filename = String.format(outputPattern, tempLocationWithGrpc, new 
Date());
+    writeGcsTextFile(gcsUtil, filename, testContent);
+
+    // Read the test file back and verify
+    assertEquals(testContent, readGcsTextFile(gcsUtil, filename));
+
+    gcsUtil.remove(Collections.singletonList(filename));
+  }
+
+  public interface GcsGrpcOptions extends PipelineOptions {
+    /** Get tempRoot in a gRPC-enabled bucket. */
+    @Description("TempRoot in a gRPC-enabled bucket")
+    String getGrpcTempRoot();
+
+    /** Set the tempRoot in a gRPC-enabled bucket. */
+    void setGrpcTempRoot(String grpcTempRoot);
+  }
+
+  void writeGcsTextFile(GcsUtil gcsUtil, String filename, String content) 
throws IOException {
+    GcsPath gcsPath = GcsPath.fromUri(filename);
+    try (WritableByteChannel channel =
+        gcsUtil.create(
+            gcsPath, 
CreateOptions.builder().setContentType("text/plain;charset=utf-8").build())) {
+      channel.write(ByteString.copyFromUtf8(content).asReadOnlyByteBuffer());
+    }
+  }
+
+  String readGcsTextFile(GcsUtil gcsUtil, String filename) throws IOException {
+    GcsPath gcsPath = GcsPath.fromUri(filename);
+    try (ByteStringOutputStream output = new ByteStringOutputStream()) {
+      try (ReadableByteChannel channel = gcsUtil.open(gcsPath)) {
+        ByteBuffer bb = ByteBuffer.allocate(16);
+        while (channel.read(bb) != -1) {
+          output.write(bb.array(), 0, bb.capacity() - bb.remaining());
+          bb.clear();
+        }
+      }
+      return output.toByteString().toStringUtf8();
+    }
+  }
 }

Reply via email to