This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 52c3f07aa56 Merge pull request #17417: [BEAM-14388] Address some 
performance problems with the storage API
52c3f07aa56 is described below

commit 52c3f07aa56de738db3130283716d38206de42b7
Author: Reuven Lax <re...@google.com>
AuthorDate: Wed May 4 13:14:07 2022 -0700

    Merge pull request #17417: [BEAM-14388] Address some performance problems 
with the storage API
---
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |  6 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |  8 +++
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 15 ++++
 .../StorageApiWriteRecordsInconsistent.java        |  7 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 84 ++++++++++++++++++----
 5 files changed, 106 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 85437f267d0..a9beb5cbd7c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -123,4 +123,10 @@ public interface BigQueryOptions
   Integer getSchemaUpdateRetries();
 
   void setSchemaUpdateRetries(Integer value);
+
+  @Description("Maximum (best effort) size of a single append to the storage 
API.")
+  @Default.Integer(2 * 1024 * 1024)
+  Integer getStorageApiAppendThresholdBytes();
+
+  void setStorageApiAppendThresholdBytes(Integer value);
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 8a7eff378c6..23fce8ba6ff 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -206,6 +206,14 @@ public interface BigQueryServices extends Serializable {
     /** Append rows to a Storage API write stream at the given offset. */
     ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) 
throws Exception;
 
+    /**
+     * If the previous call to appendRows blocked due to flow control, returns 
how long the call
+     * blocked for.
+     */
+    default long getInflightWaitSeconds() {
+      return 0;
+    }
+
     /**
      * Pin this object. If close() is called before all pins are removed, the 
underlying resources
      * will not be freed until all pins are removed.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 85a53487f88..2949150c9ee 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -33,6 +33,7 @@ import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.FixedHeaderProvider;
 import com.google.api.gax.rpc.HeaderProvider;
 import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.TransportChannelProvider;
 import com.google.api.gax.rpc.UnaryCallSettings;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.Bigquery.Tables;
@@ -1223,9 +1224,18 @@ class BigQueryServicesImpl implements BigQueryServices {
       ProtoSchema protoSchema =
           
ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
 
+      TransportChannelProvider transportChannelProvider =
+          BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
+              .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
+              .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
+              .setKeepAliveWithoutCalls(true)
+              .setChannelsPerCpu(2)
+              .build();
+
       StreamWriter streamWriter =
           StreamWriter.newBuilder(streamName)
               .setWriterSchema(protoSchema)
+              .setChannelProvider(transportChannelProvider)
               .setTraceId(
                   "Dataflow:"
                       + (bqIOMetadata.getBeamJobId() != null
@@ -1275,6 +1285,11 @@ class BigQueryServicesImpl implements BigQueryServices {
             throws Exception {
           return streamWriter.append(rows, offset);
         }
+
+        @Override
+        public long getInflightWaitSeconds() {
+          return streamWriter.getInflightWaitSeconds();
+        }
       };
     }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
index b88f4d4b920..e433925e5b3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
@@ -47,12 +47,17 @@ public class 
StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
   @Override
   public PCollection<Void> expand(PCollection<KV<DestinationT, 
StorageApiWritePayload>> input) {
     String operationName = input.getName() + "/" + getName();
+    BigQueryOptions bigQueryOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
     // Append records to the Storage API streams.
     input.apply(
         "Write Records",
         ParDo.of(
                 new StorageApiWriteUnshardedRecords.WriteRecordsDoFn<>(
-                    operationName, dynamicDestinations, bqServices, true))
+                    operationName,
+                    dynamicDestinations,
+                    bqServices,
+                    true,
+                    bigQueryOptions.getStorageApiAppendThresholdBytes()))
             .withSideInputs(dynamicDestinations.getSideInputs()));
     return input.getPipeline().apply("voids", Create.empty(VoidCoder.of()));
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 1751799dd51..f033f423466 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -19,12 +19,14 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
+import com.google.api.core.ApiFuture;
 import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
 import com.google.cloud.bigquery.storage.v1.ProtoRows;
 import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DynamicMessage;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -40,6 +42,7 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
 import 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper;
 import 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -76,11 +79,19 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
   private final BigQueryServices bqServices;
   private static final ExecutorService closeWriterExecutor = 
Executors.newCachedThreadPool();
 
+  // The Guava cache object is threadsafe. However our protocol requires that 
client pin the
+  // StreamAppendClient
+  // after looking up the cache, and we must ensure that the cache is not 
accessed in between the
+  // lookup and the pin
+  // (any access of the cache could trigger element expiration). Therefore 
most used of
+  // APPEND_CLIENTS should
+  // synchronize.
   private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
       CacheBuilder.newBuilder()
-          .expireAfterAccess(5, TimeUnit.MINUTES)
+          .expireAfterAccess(15, TimeUnit.MINUTES)
           .removalListener(
               (RemovalNotification<String, StreamAppendClient> removal) -> {
+                LOG.info("Expiring append client for " + removal.getKey());
                 @Nullable final StreamAppendClient streamAppendClient = 
removal.getValue();
                 // Close the writer in a different thread so as not to block 
the main one.
                 runAsyncIgnoreFailure(closeWriterExecutor, 
streamAppendClient::close);
@@ -117,10 +128,17 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
   @Override
   public PCollection<Void> expand(PCollection<KV<DestinationT, 
StorageApiWritePayload>> input) {
     String operationName = input.getName() + "/" + getName();
+    BigQueryOptions options = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
     return input
         .apply(
             "Write Records",
-            ParDo.of(new WriteRecordsDoFn<>(operationName, 
dynamicDestinations, bqServices, false))
+            ParDo.of(
+                    new WriteRecordsDoFn<>(
+                        operationName,
+                        dynamicDestinations,
+                        bqServices,
+                        false,
+                        options.getStorageApiAppendThresholdBytes()))
                 .withSideInputs(dynamicDestinations.getSideInputs()))
         .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
         // Calling Reshuffle makes the output stable - once this completes, 
the append operations
@@ -132,6 +150,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
 
   static class WriteRecordsDoFn<DestinationT, ElementT>
       extends DoFn<KV<DestinationT, StorageApiWritePayload>, KV<String, 
String>> {
+    private final Counter forcedFlushes = 
Metrics.counter(WriteRecordsDoFn.class, "forcedFlushes");
+
     class DestinationState {
       private final String tableUrn;
       private final MessageConverter<ElementT> messageConverter;
@@ -144,8 +164,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
           Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
       private final Counter appendFailures =
           Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
+      private final Counter schemaMismatches =
+          Metrics.counter(WriteRecordsDoFn.class, "schemaMismatches");
+      private final Distribution inflightWaitSecondsDistribution =
+          Metrics.distribution(WriteRecordsDoFn.class, 
"streamWriterWaitSeconds");
       private final boolean useDefaultStream;
       private DescriptorWrapper descriptorWrapper;
+      private Instant nextCacheTickle;
 
       public DestinationState(
           String tableUrn,
@@ -161,8 +186,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
       }
 
       void teardown() {
+        maybeTickleCache();
         if (streamAppendClient != null) {
           runAsyncIgnoreFailure(closeWriterExecutor, 
streamAppendClient::unpin);
+          streamAppendClient = null;
         }
       }
 
@@ -206,6 +233,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
               this.streamAppendClient.pin();
             }
             this.currentOffset = 0;
+            nextCacheTickle = 
Instant.now().plus(java.time.Duration.ofMinutes(1));
           }
           return streamAppendClient;
         } catch (Exception e) {
@@ -213,21 +241,43 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         }
       }
 
+      void maybeTickleCache() {
+        if (streamAppendClient != null && 
Instant.now().isAfter(nextCacheTickle)) {
+          synchronized (APPEND_CLIENTS) {
+            APPEND_CLIENTS.getIfPresent(streamName);
+          }
+          nextCacheTickle = 
Instant.now().plus(java.time.Duration.ofMinutes(1));
+        }
+      }
+
       void invalidateWriteStream() {
         if (streamAppendClient != null) {
           synchronized (APPEND_CLIENTS) {
             // Unpin in a different thread, as it may execute a blocking close.
             runAsyncIgnoreFailure(closeWriterExecutor, 
streamAppendClient::unpin);
+            // The default stream is cached across multiple different DoFns. 
If they all try and
+            // invalidate, then we can
+            // get races between threads invalidating and recreating streams. 
For this reason, we
+            // check to see that the
+            // cache still contains the object we created before invalidating 
(in case another
+            // thread has already invalidated
+            // and recreated the stream).
+            @Nullable
+            StreamAppendClient cachedAppendClient = 
APPEND_CLIENTS.getIfPresent(streamName);
+            if (cachedAppendClient != null
+                && System.identityHashCode(cachedAppendClient)
+                    == System.identityHashCode(streamAppendClient)) {
+              APPEND_CLIENTS.invalidate(streamName);
+            }
           }
           streamAppendClient = null;
-          APPEND_CLIENTS.invalidate(streamName);
-        } else if (useDefaultStream) {
-          APPEND_CLIENTS.invalidate(getDefaultStreamName());
         }
       }
 
       void addMessage(StorageApiWritePayload payload) throws Exception {
+        maybeTickleCache();
         if (payload.getSchemaHash() != descriptorWrapper.hash) {
+          schemaMismatches.inc();
           // The descriptor on the payload doesn't match the descriptor we 
know about. This
           // means that the table has been updated, but that this transform 
hasn't found out
           // about that yet. Refresh the schema and force a new 
StreamAppendClient to be
@@ -259,9 +309,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
           return;
         }
         final ProtoRows.Builder inserts = ProtoRows.newBuilder();
-        for (ByteString m : pendingMessages) {
-          inserts.addSerializedRows(m);
-        }
+        inserts.addAllSerializedRows(pendingMessages);
 
         ProtoRows protoRows = inserts.build();
         pendingMessages.clear();
@@ -275,7 +323,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
                   offset = this.currentOffset;
                   this.currentOffset += inserts.getSerializedRowsCount();
                 }
-                return writeStream.appendRows(offset, protoRows);
+                ApiFuture<AppendRowsResponse> response = 
writeStream.appendRows(offset, protoRows);
+                
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
+                if (writeStream.getInflightWaitSeconds() > 5) {
+                  LOG.warn(
+                      "Storage Api write delay more than " + 
writeStream.getInflightWaitSeconds());
+                }
+                return response;
               } catch (Exception e) {
                 throw new RuntimeException(e);
               }
@@ -294,6 +348,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
               recordsAppended.inc(protoRows.getSerializedRowsCount());
             },
             new Context<>());
+        maybeTickleCache();
       }
     }
 
@@ -302,8 +357,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
     private transient @Nullable DatasetService datasetService;
     private int numPendingRecords = 0;
     private int numPendingRecordBytes = 0;
-    private static final int FLUSH_THRESHOLD_RECORDS = 100;
-    private static final int FLUSH_THRESHOLD_RECORD_BYTES = 2 * 1024 * 1024;
+    private static final int FLUSH_THRESHOLD_RECORDS = 150000;
+    private final int flushThresholdBytes;
     private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
     private final BigQueryServices bqServices;
     private final boolean useDefaultStream;
@@ -312,20 +367,23 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         String operationName,
         StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
         BigQueryServices bqServices,
-        boolean useDefaultStream) {
+        boolean useDefaultStream,
+        int flushThresholdBytes) {
       this.messageConverters = new 
TwoLevelMessageConverterCache<>(operationName);
       this.dynamicDestinations = dynamicDestinations;
       this.bqServices = bqServices;
       this.useDefaultStream = useDefaultStream;
+      this.flushThresholdBytes = flushThresholdBytes;
     }
 
     boolean shouldFlush() {
       return numPendingRecords > FLUSH_THRESHOLD_RECORDS
-          || numPendingRecordBytes > FLUSH_THRESHOLD_RECORD_BYTES;
+          || numPendingRecordBytes > flushThresholdBytes;
     }
 
     void flushIfNecessary() throws Exception {
       if (shouldFlush()) {
+        forcedFlushes.inc();
         // Too much memory being used. Flush the state and wait for it to 
drain out.
         // TODO(reuvenlax): Consider waiting for memory usage to drop instead 
of waiting for all the
         // appends to finish.

Reply via email to