This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 171d3e3d9d7 Merge pull request #23510: Vortex multiplexing streams 171d3e3d9d7 is described below commit 171d3e3d9d700aa90c498719a0a925916c1cb56e Author: Reuven Lax <re...@google.com> AuthorDate: Tue Oct 18 21:12:15 2022 -0700 Merge pull request #23510: Vortex multiplexing streams --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 6 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 5 +++ .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 4 +-- .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 5 +-- .../bigquery/StorageApiWriteUnshardedRecords.java | 36 ++++++++++++++++------ .../bigquery/StorageApiWritesShardedRecords.java | 4 +-- .../sdk/io/gcp/testing/FakeDatasetService.java | 3 +- 7 files changed, 43 insertions(+), 20 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index a81dd6b9055..26a14e86012 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -576,12 +576,12 @@ class BeamModulePlugin implements Plugin<Project> { google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version", google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version", google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version - google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20220827-$google_clients_version", + google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20220924-$google_clients_version", google_api_services_clouddebugger : "com.google.apis:google-api-services-clouddebugger:v2-rev20220318-$google_clients_version", google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20220828-$google_clients_version", - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20220812-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20220818-$google_clients_version", - google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220829-$google_clients_version", + google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20220705-$google_clients_version", google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index cfebb754216..953d1237d9c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -109,6 +109,11 @@ public interface BigQueryOptions void setNumStorageWriteApiStreamAppendClients(Integer value); + @Default.Boolean(false) + Boolean getUseStorageApiConnectionPool(); + + void setUseStorageApiConnectionPool(Boolean value); + @Description( "If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.") Integer getStorageWriteApiTriggeringFrequencySec(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index a0484ccf972..adf02ed31a6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -208,8 +208,8 @@ public interface BigQueryServices extends Serializable { * Create an append client for a given Storage API write stream. The stream must be created * first. */ - StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor) - throws Exception; + StreamAppendClient getStreamAppendClient( + String streamName, Descriptor descriptor, boolean useConnectionPool) throws Exception; /** Flush a given stream up to the given offset. The stream must have type BUFFERED. */ ApiFuture<FlushRowsResponse> flush(String streamName, long flushOffset) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 3cb4e5cce8c..9df75a5be94 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1305,8 +1305,8 @@ class BigQueryServicesImpl implements BigQueryServices { } @Override - public StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor) - throws Exception { + public StreamAppendClient getStreamAppendClient( + String streamName, Descriptor descriptor, boolean useConnectionPool) throws Exception { ProtoSchema protoSchema = ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build(); @@ -1322,6 +1322,7 @@ class BigQueryServicesImpl implements BigQueryServices { StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setChannelProvider(transportChannelProvider) + .setEnableConnectionPool(useConnectionPool) .setTraceId( "Dataflow:" + (bqIOMetadata.getBeamJobId() != null diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 607f06c4e00..871fc73698a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -131,6 +131,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayload>> input) { String operationName = input.getName() + "/" + getName(); BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); + org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument( + !options.getUseStorageApiConnectionPool(), + "useStorageApiConnectionPool only supported " + "when using STORAGE_API_AT_LEAST_ONCE"); return input .apply( "Write Records", @@ -176,13 +179,15 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> private DescriptorWrapper descriptorWrapper; private Instant nextCacheTickle = Instant.MAX; private final int clientNumber; + private final boolean usingMultiplexing; public DestinationState( String tableUrn, MessageConverter<ElementT> messageConverter, DatasetService datasetService, boolean useDefaultStream, - int streamAppendClientCount) { + int streamAppendClientCount, + BigQueryOptions bigQueryOptions) { this.tableUrn = tableUrn; this.messageConverter = messageConverter; this.pendingMessages = Lists.newArrayList(); @@ -190,6 +195,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> this.useDefaultStream = useDefaultStream; this.descriptorWrapper = messageConverter.getSchemaDescriptor(); this.clientNumber = new Random().nextInt(streamAppendClientCount); + this.usingMultiplexing = bigQueryOptions.getUseStorageApiConnectionPool(); } void teardown() { @@ -229,7 +235,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> StreamAppendClient generateClient() throws Exception { Preconditions.checkStateNotNull(maybeDatasetService); - return maybeDatasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor); + return maybeDatasetService.getStreamAppendClient( + streamName, descriptorWrapper.descriptor, usingMultiplexing); } StreamAppendClient getStreamAppendClient(boolean lookupCache) { @@ -342,11 +349,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> this.currentOffset += inserts.getSerializedRowsCount(); } ApiFuture<AppendRowsResponse> response = writeStream.appendRows(offset, protoRows); - inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds()); - if (writeStream.getInflightWaitSeconds() > 5) { - LOG.warn( - "Storage Api write delay more than {} seconds.", - writeStream.getInflightWaitSeconds()); + if (!usingMultiplexing) { + inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds()); + if (writeStream.getInflightWaitSeconds() > 5) { + LOG.warn( + "Storage Api write delay more than {} seconds.", + writeStream.getInflightWaitSeconds()); + } } return response; } catch (Exception e) { @@ -456,7 +465,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> } DestinationState createDestinationState( - ProcessContext c, DestinationT destination, DatasetService datasetService) { + ProcessContext c, + DestinationT destination, + DatasetService datasetService, + BigQueryOptions bigQueryOptions) { TableDestination tableDestination1 = dynamicDestinations.getTable(destination); checkArgument( tableDestination1 != null, @@ -475,7 +487,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> messageConverter, datasetService, useDefaultStream, - streamAppendClientCount); + streamAppendClientCount, + bigQueryOptions); } @ProcessElement @@ -489,7 +502,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> Preconditions.checkStateNotNull(destinations); DestinationState state = destinations.computeIfAbsent( - element.getKey(), k -> createDestinationState(c, k, initializedDatasetService)); + element.getKey(), + k -> + createDestinationState( + c, k, initializedDatasetService, pipelineOptions.as(BigQueryOptions.class))); flushIfNecessary(); state.addMessage(element.getValue()); ++numPendingRecords; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index afecc966955..c8bb805b6e8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -389,7 +389,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object stream, () -> datasetService.getStreamAppendClient( - stream, descriptor.get().descriptor)); + stream, descriptor.get().descriptor, false)); for (AppendRowsContext context : contexts) { context.streamName = stream; appendClient.pin(); @@ -430,7 +430,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object context.streamName, () -> datasetService.getStreamAppendClient( - context.streamName, descriptor.get().descriptor)); + context.streamName, descriptor.get().descriptor, false)); return appendClient.appendRows(context.offset, protoRows); } catch (Exception e) { throw new RuntimeException(e); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 213d6498a0e..44f73bd56cb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -492,7 +492,8 @@ public class FakeDatasetService implements DatasetService, Serializable { } @Override - public StreamAppendClient getStreamAppendClient(String streamName, Descriptor descriptor) { + public StreamAppendClient getStreamAppendClient( + String streamName, Descriptor descriptor, boolean useConnectionPool) { return new StreamAppendClient() { private Descriptor protoDescriptor;