This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 52c3f07aa56 Merge pull request #17417: [BEAM-14388] Address some performance problems with the storage API 52c3f07aa56 is described below commit 52c3f07aa56de738db3130283716d38206de42b7 Author: Reuven Lax <re...@google.com> AuthorDate: Wed May 4 13:14:07 2022 -0700 Merge pull request #17417: [BEAM-14388] Address some performance problems with the storage API --- .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 6 ++ .../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 8 +++ .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 15 ++++ .../StorageApiWriteRecordsInconsistent.java | 7 +- .../bigquery/StorageApiWriteUnshardedRecords.java | 84 ++++++++++++++++++---- 5 files changed, 106 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 85437f267d0..a9beb5cbd7c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -123,4 +123,10 @@ public interface BigQueryOptions Integer getSchemaUpdateRetries(); void setSchemaUpdateRetries(Integer value); + + @Description("Maximum (best effort) size of a single append to the storage API.") + @Default.Integer(2 * 1024 * 1024) + Integer getStorageApiAppendThresholdBytes(); + + void setStorageApiAppendThresholdBytes(Integer value); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 8a7eff378c6..23fce8ba6ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -206,6 +206,14 @@ public interface BigQueryServices extends Serializable { /** Append rows to a Storage API write stream at the given offset. */ ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) throws Exception; + /** + * If the previous call to appendRows blocked due to flow control, returns how long the call + * blocked for. + */ + default long getInflightWaitSeconds() { + return 0; + } + /** * Pin this object. If close() is called before all pins are removed, the underlying resources * will not be freed until all pins are removed. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 85a53487f88..2949150c9ee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -33,6 +33,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.Bigquery.Tables; @@ -1223,9 +1224,18 @@ class BigQueryServicesImpl implements BigQueryServices { ProtoSchema protoSchema = ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build(); + TransportChannelProvider transportChannelProvider = + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .setChannelsPerCpu(2) + .build(); + StreamWriter streamWriter = StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) + .setChannelProvider(transportChannelProvider) .setTraceId( "Dataflow:" + (bqIOMetadata.getBeamJobId() != null @@ -1275,6 +1285,11 @@ class BigQueryServicesImpl implements BigQueryServices { throws Exception { return streamWriter.append(rows, offset); } + + @Override + public long getInflightWaitSeconds() { + return streamWriter.getInflightWaitSeconds(); + } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index b88f4d4b920..e433925e5b3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -47,12 +47,17 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT> @Override public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayload>> input) { String operationName = input.getName() + "/" + getName(); + BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); // Append records to the Storage API streams. input.apply( "Write Records", ParDo.of( new StorageApiWriteUnshardedRecords.WriteRecordsDoFn<>( - operationName, dynamicDestinations, bqServices, true)) + operationName, + dynamicDestinations, + bqServices, + true, + bigQueryOptions.getStorageApiAppendThresholdBytes())) .withSideInputs(dynamicDestinations.getSideInputs())); return input.getPipeline().apply("voids", Create.empty(VoidCoder.of())); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 1751799dd51..f033f423466 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -19,12 +19,14 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import com.google.api.core.ApiFuture; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.ByteString; import com.google.protobuf.DynamicMessage; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -40,6 +42,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType; import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper; import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -76,11 +79,19 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> private final BigQueryServices bqServices; private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); + // The Guava cache object is threadsafe. However our protocol requires that client pin the + // StreamAppendClient + // after looking up the cache, and we must ensure that the cache is not accessed in between the + // lookup and the pin + // (any access of the cache could trigger element expiration). Therefore most used of + // APPEND_CLIENTS should + // synchronize. private static final Cache<String, StreamAppendClient> APPEND_CLIENTS = CacheBuilder.newBuilder() - .expireAfterAccess(5, TimeUnit.MINUTES) + .expireAfterAccess(15, TimeUnit.MINUTES) .removalListener( (RemovalNotification<String, StreamAppendClient> removal) -> { + LOG.info("Expiring append client for " + removal.getKey()); @Nullable final StreamAppendClient streamAppendClient = removal.getValue(); // Close the writer in a different thread so as not to block the main one. runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close); @@ -117,10 +128,17 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> @Override public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayload>> input) { String operationName = input.getName() + "/" + getName(); + BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); return input .apply( "Write Records", - ParDo.of(new WriteRecordsDoFn<>(operationName, dynamicDestinations, bqServices, false)) + ParDo.of( + new WriteRecordsDoFn<>( + operationName, + dynamicDestinations, + bqServices, + false, + options.getStorageApiAppendThresholdBytes())) .withSideInputs(dynamicDestinations.getSideInputs())) .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) // Calling Reshuffle makes the output stable - once this completes, the append operations @@ -132,6 +150,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> static class WriteRecordsDoFn<DestinationT, ElementT> extends DoFn<KV<DestinationT, StorageApiWritePayload>, KV<String, String>> { + private final Counter forcedFlushes = Metrics.counter(WriteRecordsDoFn.class, "forcedFlushes"); + class DestinationState { private final String tableUrn; private final MessageConverter<ElementT> messageConverter; @@ -144,8 +164,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> Metrics.counter(WriteRecordsDoFn.class, "recordsAppended"); private final Counter appendFailures = Metrics.counter(WriteRecordsDoFn.class, "appendFailures"); + private final Counter schemaMismatches = + Metrics.counter(WriteRecordsDoFn.class, "schemaMismatches"); + private final Distribution inflightWaitSecondsDistribution = + Metrics.distribution(WriteRecordsDoFn.class, "streamWriterWaitSeconds"); private final boolean useDefaultStream; private DescriptorWrapper descriptorWrapper; + private Instant nextCacheTickle; public DestinationState( String tableUrn, @@ -161,8 +186,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> } void teardown() { + maybeTickleCache(); if (streamAppendClient != null) { runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::unpin); + streamAppendClient = null; } } @@ -206,6 +233,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> this.streamAppendClient.pin(); } this.currentOffset = 0; + nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); } return streamAppendClient; } catch (Exception e) { @@ -213,21 +241,43 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> } } + void maybeTickleCache() { + if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) { + synchronized (APPEND_CLIENTS) { + APPEND_CLIENTS.getIfPresent(streamName); + } + nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); + } + } + void invalidateWriteStream() { if (streamAppendClient != null) { synchronized (APPEND_CLIENTS) { // Unpin in a different thread, as it may execute a blocking close. runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::unpin); + // The default stream is cached across multiple different DoFns. If they all try and + // invalidate, then we can + // get races between threads invalidating and recreating streams. For this reason, we + // check to see that the + // cache still contains the object we created before invalidating (in case another + // thread has already invalidated + // and recreated the stream). + @Nullable + StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName); + if (cachedAppendClient != null + && System.identityHashCode(cachedAppendClient) + == System.identityHashCode(streamAppendClient)) { + APPEND_CLIENTS.invalidate(streamName); + } } streamAppendClient = null; - APPEND_CLIENTS.invalidate(streamName); - } else if (useDefaultStream) { - APPEND_CLIENTS.invalidate(getDefaultStreamName()); } } void addMessage(StorageApiWritePayload payload) throws Exception { + maybeTickleCache(); if (payload.getSchemaHash() != descriptorWrapper.hash) { + schemaMismatches.inc(); // The descriptor on the payload doesn't match the descriptor we know about. This // means that the table has been updated, but that this transform hasn't found out // about that yet. Refresh the schema and force a new StreamAppendClient to be @@ -259,9 +309,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> return; } final ProtoRows.Builder inserts = ProtoRows.newBuilder(); - for (ByteString m : pendingMessages) { - inserts.addSerializedRows(m); - } + inserts.addAllSerializedRows(pendingMessages); ProtoRows protoRows = inserts.build(); pendingMessages.clear(); @@ -275,7 +323,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> offset = this.currentOffset; this.currentOffset += inserts.getSerializedRowsCount(); } - return writeStream.appendRows(offset, protoRows); + ApiFuture<AppendRowsResponse> response = writeStream.appendRows(offset, protoRows); + inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds()); + if (writeStream.getInflightWaitSeconds() > 5) { + LOG.warn( + "Storage Api write delay more than " + writeStream.getInflightWaitSeconds()); + } + return response; } catch (Exception e) { throw new RuntimeException(e); } @@ -294,6 +348,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> recordsAppended.inc(protoRows.getSerializedRowsCount()); }, new Context<>()); + maybeTickleCache(); } } @@ -302,8 +357,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> private transient @Nullable DatasetService datasetService; private int numPendingRecords = 0; private int numPendingRecordBytes = 0; - private static final int FLUSH_THRESHOLD_RECORDS = 100; - private static final int FLUSH_THRESHOLD_RECORD_BYTES = 2 * 1024 * 1024; + private static final int FLUSH_THRESHOLD_RECORDS = 150000; + private final int flushThresholdBytes; private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations; private final BigQueryServices bqServices; private final boolean useDefaultStream; @@ -312,20 +367,23 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> String operationName, StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations, BigQueryServices bqServices, - boolean useDefaultStream) { + boolean useDefaultStream, + int flushThresholdBytes) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.useDefaultStream = useDefaultStream; + this.flushThresholdBytes = flushThresholdBytes; } boolean shouldFlush() { return numPendingRecords > FLUSH_THRESHOLD_RECORDS - || numPendingRecordBytes > FLUSH_THRESHOLD_RECORD_BYTES; + || numPendingRecordBytes > flushThresholdBytes; } void flushIfNecessary() throws Exception { if (shouldFlush()) { + forcedFlushes.inc(); // Too much memory being used. Flush the state and wait for it to drain out. // TODO(reuvenlax): Consider waiting for memory usage to drop instead of waiting for all the // appends to finish.