This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 72af70d2198 Add a test to read and write GCS with gRPC enabled. (#30352) 72af70d2198 is described below commit 72af70d219892d635b39fd5da65f4c2522624c35 Author: Shunping Huang <shunp...@google.com> AuthorDate: Thu Feb 22 10:17:19 2024 -0500 Add a test to read and write GCS with gRPC enabled. (#30352) * Add a test to read and write GCS with gRPC enabled. * Add an empty file to trigger post commit java test * Make the buckets used in the test configurable. --- .../google-cloud-platform-core/build.gradle | 2 + .../beam/sdk/extensions/gcp/util/GcsUtilIT.java | 87 ++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/sdks/java/extensions/google-cloud-platform-core/build.gradle b/sdks/java/extensions/google-cloud-platform-core/build.gradle index 542c641c855..d4dfd46f745 100644 --- a/sdks/java/extensions/google-cloud-platform-core/build.gradle +++ b/sdks/java/extensions/google-cloud-platform-core/build.gradle @@ -66,10 +66,12 @@ task integrationTestKms(type: Test) { group = "Verification" def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' def gcpTempRoot = project.findProperty('gcpTempRootKms') ?: 'gs://temp-storage-for-end-to-end-tests-cmek' + def gcpGrpcTempRoot = project.findProperty('gcpGrpcTempRoot') ?: 'gs://gcs-grpc-team-apache-beam-testing' def dataflowKmsKey = project.findProperty('dataflowKmsKey') ?: "projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test" systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ "--project=${gcpProject}", "--tempRoot=${gcpTempRoot}", + "--grpcTempRoot=${gcpGrpcTempRoot}", "--dataflowKmsKey=${dataflowKmsKey}", ]) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java index bc9853beca4..6f1e0e985c2 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilIT.java @@ -19,17 +19,30 @@ package org.apache.beam.sdk.extensions.gcp.util; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import com.google.protobuf.ByteString; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collections; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.testing.UsesKms; +import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -76,4 +89,78 @@ public class GcsUtilIT { gcsUtil.remove(Lists.newArrayList(dstFilename)); } + + // TODO: once the gRPC feature is in public GA, we will have to refactor this test. + // As gRPC will be automatically enabled in each bucket by then, we will no longer need to check + // the failed case. The interface of GcsGrpcOptions can also be removed. + @Test + public void testWriteAndReadGcsWithGrpc() throws IOException { + final String outputPattern = + "%s/GcsUtilIT-%tF-%<tH-%<tM-%<tS-%<tL.testWriteAndReadGcsWithGrpc.txt"; + final String testContent = "This is a test string."; + + PipelineOptionsFactory.register(GcsGrpcOptions.class); + + TestPipelineOptions options = + TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + + // set the experimental flag to enable grpc + ExperimentalOptions experimental = options.as(ExperimentalOptions.class); + experimental.setExperiments(Collections.singletonList("use_grpc_for_gcs")); + + GcsOptions gcsOptions = options.as(GcsOptions.class); + GcsUtil gcsUtil = gcsOptions.getGcsUtil(); + assertNotNull(gcsUtil); + + // Write a test file in a bucket without gRPC enabled. + // This assumes that GCS gRPC feature is not enabled in every bucket by default. + assertNotNull(options.getTempRoot()); + String tempLocationWithoutGrpc = options.getTempRoot() + "/temp"; + String wrongFilename = String.format(outputPattern, tempLocationWithoutGrpc, new Date()); + assertThrows(IOException.class, () -> writeGcsTextFile(gcsUtil, wrongFilename, testContent)); + + // Write a test file in a bucket with gRPC enabled. + GcsGrpcOptions grpcOptions = options.as(GcsGrpcOptions.class); + assertNotNull(grpcOptions.getGrpcTempRoot()); + String tempLocationWithGrpc = grpcOptions.getGrpcTempRoot() + "/temp"; + String filename = String.format(outputPattern, tempLocationWithGrpc, new Date()); + writeGcsTextFile(gcsUtil, filename, testContent); + + // Read the test file back and verify + assertEquals(testContent, readGcsTextFile(gcsUtil, filename)); + + gcsUtil.remove(Collections.singletonList(filename)); + } + + public interface GcsGrpcOptions extends PipelineOptions { + /** Get tempRoot in a gRPC-enabled bucket. */ + @Description("TempRoot in a gRPC-enabled bucket") + String getGrpcTempRoot(); + + /** Set the tempRoot in a gRPC-enabled bucket. */ + void setGrpcTempRoot(String grpcTempRoot); + } + + void writeGcsTextFile(GcsUtil gcsUtil, String filename, String content) throws IOException { + GcsPath gcsPath = GcsPath.fromUri(filename); + try (WritableByteChannel channel = + gcsUtil.create( + gcsPath, CreateOptions.builder().setContentType("text/plain;charset=utf-8").build())) { + channel.write(ByteString.copyFromUtf8(content).asReadOnlyByteBuffer()); + } + } + + String readGcsTextFile(GcsUtil gcsUtil, String filename) throws IOException { + GcsPath gcsPath = GcsPath.fromUri(filename); + try (ByteStringOutputStream output = new ByteStringOutputStream()) { + try (ReadableByteChannel channel = gcsUtil.open(gcsPath)) { + ByteBuffer bb = ByteBuffer.allocate(16); + while (channel.read(bb) != -1) { + output.write(bb.array(), 0, bb.capacity() - bb.remaining()); + bb.clear(); + } + } + return output.toByteString().toStringUtf8(); + } + } }