This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 21d9f6d Merge pull request #16347: fix: move connector to use v1 BigQuery Storage Write API 21d9f6d is described below commit 21d9f6d4ebf3e1c8b80c249fcb32e3e820eeb12b Author: Yiru Tang <y...@google.com> AuthorDate: Wed Jan 5 16:21:23 2022 -0800 Merge pull request #16347: fix: move connector to use v1 BigQuery Storage Write API --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 - sdks/java/io/google-cloud-platform/build.gradle | 1 - .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 12 ++++---- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 35 ++++++++++++---------- .../sdk/io/gcp/bigquery/SplittingIterable.java | 2 +- .../gcp/bigquery/StorageApiFinalizeWritesDoFn.java | 8 ++--- .../bigquery/StorageApiFlushAndFinalizeDoFn.java | 4 +-- .../bigquery/StorageApiWriteUnshardedRecords.java | 6 ++-- .../bigquery/StorageApiWritesShardedRecords.java | 6 ++-- .../sdk/io/gcp/testing/FakeDatasetService.java | 14 ++++----- .../apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java | 1 - 11 files changed, 45 insertions(+), 45 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 3479446..6538958 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -646,7 +646,6 @@ class BeamModulePlugin implements Plugin<Project> { protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version", proto_google_cloud_bigquery_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_bigtable_admin_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-admin-v2", // google_cloud_platform_libraries_bom sets version - proto_google_cloud_bigquery_storage_v1beta2 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", // google_cloud_platform_libraries_bom sets version proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 6d7d911..efe4fb9 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -111,7 +111,6 @@ dependencies { implementation library.java.netty_tcnative_boringssl_static permitUnusedDeclared library.java.netty_tcnative_boringssl_static // BEAM-11761 implementation library.java.proto_google_cloud_bigquery_storage_v1 - implementation library.java.proto_google_cloud_bigquery_storage_v1beta2 implementation library.java.proto_google_cloud_bigtable_admin_v2 implementation library.java.proto_google_cloud_bigtable_v2 implementation library.java.proto_google_cloud_datastore_v1 diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 29d5c00..927b7b3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -29,18 +29,18 @@ import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1.FlushRowsResponse; +import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; -import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse; -import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse; -import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse; -import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; -import com.google.cloud.bigquery.storage.v1beta2.WriteStream; +import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.protobuf.Descriptors.Descriptor; import java.io.IOException; import java.io.Serializable; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 5a270d4..f607598 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -58,28 +58,28 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1.FlushRowsRequest; +import com.google.cloud.bigquery.storage.v1.FlushRowsResponse; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.ProtoSchema; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; -import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsRequest; -import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse; -import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; -import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings; -import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse; -import com.google.cloud.bigquery.storage.v1beta2.FlushRowsRequest; -import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse; -import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; -import com.google.cloud.bigquery.storage.v1beta2.ProtoSchema; -import com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2; -import com.google.cloud.bigquery.storage.v1beta2.WriteStream; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.protobuf.Descriptors.Descriptor; @@ -1194,8 +1194,11 @@ class BigQueryServicesImpl implements BigQueryServices { throws Exception { ProtoSchema protoSchema = ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build(); - StreamWriterV2 streamWriter = - StreamWriterV2.newBuilder(streamName).setWriterSchema(protoSchema).build(); + StreamWriter streamWriter = + StreamWriter.newBuilder(streamName) + .setWriterSchema(protoSchema) + .setTraceId("Dataflow") + .build(); return new StreamAppendClient() { private int pins = 0; private boolean closed = false; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java index cf367f3..7152284 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; +import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.protobuf.ByteString; import java.util.Iterator; import java.util.NoSuchElementException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java index 7dd3485..2744603 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java @@ -17,10 +17,10 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse; -import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse; -import com.google.cloud.bigquery.storage.v1beta2.StorageError; -import com.google.cloud.bigquery.storage.v1beta2.StorageError.StorageErrorCode; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1.StorageError; +import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; import java.io.IOException; import java.util.Collection; import java.util.Map; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java index e614ede..1c3686a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java @@ -19,8 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; -import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse; -import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1.FlushRowsResponse; import java.io.IOException; import java.io.Serializable; import java.time.Instant; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 21c7a46..a157236 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -20,9 +20,9 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; -import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.List; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 3d6e33c..15982a3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -21,9 +21,9 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import com.google.api.core.ApiFuture; import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; -import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.Descriptors.Descriptor; import io.grpc.Status; import io.grpc.Status.Code; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 239c2ad..7701874 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -29,13 +29,13 @@ import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableDataInsertAllResponse; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; -import com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse; -import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse; -import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse; -import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; -import com.google.cloud.bigquery.storage.v1beta2.WriteStream; -import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1.FlushRowsResponse; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import com.google.protobuf.ByteString; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 118210e..6e89115 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -69,7 +69,6 @@ public class GcpApiSurfaceTest { classesInPackage("com.google.cloud"), classesInPackage("com.google.common.collect"), classesInPackage("com.google.cloud.bigquery.storage.v1"), - classesInPackage("com.google.cloud.bigquery.storage.v1beta2"), classesInPackage("com.google.cloud.bigtable.config"), classesInPackage("com.google.iam.v1"), classesInPackage("com.google.spanner.v1"),