reuvenlax commented on code in PR #17550:
URL: https://github.com/apache/beam/pull/17550#discussion_r865447469
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -93,7 +93,10 @@
void setUseStorageWriteApiAtLeastOnce(Boolean value);
@Description(
- "If set, then BigQueryIO.Write will default to using this number of
Storage Write API streams.")
+ "If set, then BigQueryIO.Write will default to using this number of
Storage Write API streams. "
+ + "The number of streams indicated will be allocated at a per worker
and destination basis, "
+ + "a high number can cause a large pipeline to go over the BigQuery
connection quota quickly. "
+ + "With low-mid volume pipelines using the default configuration
should be enough.")
@Default.Integer(0)
Review Comment:
We should create a new option. This option refers to physical stream
creation (i.e. createStream), not the number of grpc streams.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -129,6 +135,9 @@ public StorageApiWriteUnshardedRecords(
public PCollection<Void> expand(PCollection<KV<DestinationT,
StorageApiWritePayload>> input) {
String operationName = input.getName() + "/" + getName();
BigQueryOptions options =
input.getPipeline().getOptions().as(BigQueryOptions.class);
+ // default value from options is 0, so we set at least one client
+ Integer numStreams =
+ options.getNumStorageWriteApiStreams() == 0 ? 1 :
options.getNumStorageWriteApiStreams();
Review Comment:
create a new option that defaults to 1
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -86,15 +89,18 @@
// (any access of the cache could trigger element expiration). Therefore
most used of
// APPEND_CLIENTS should
// synchronize.
- private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
+ private static final Cache<String, List<StreamAppendClient>> APPEND_CLIENTS =
Review Comment:
what did you find about the cost of synchronization?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -213,22 +229,38 @@ String createStreamIfNeeded() {
return this.streamName;
}
+ List<StreamAppendClient> generateClients() {
+ return IntStream.range(0, streamAppendClientCount)
+ .mapToObj(
+ i -> {
+ try {
+ StreamAppendClient client =
+ datasetService.getStreamAppendClient(
+ streamName, descriptorWrapper.descriptor);
+ return client;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
StreamAppendClient getStreamAppendClient(boolean lookupCache) {
try {
if (streamAppendClient == null) {
createStreamIfNeeded();
synchronized (APPEND_CLIENTS) {
if (lookupCache) {
this.streamAppendClient =
- APPEND_CLIENTS.get(
- streamName,
- () ->
- datasetService.getStreamAppendClient(
- streamName, descriptorWrapper.descriptor));
+ APPEND_CLIENTS.get(streamName, () ->
generateClients()).get(clientNumber);
Review Comment:
Instead of generating all clients eagerly, let's do it lazily. Initialize a
List with count copies of Optional.empty(). Then do
this.streamAppendCient = APPEND_CLIENTS.get(streamName,
this.generateClients).get(clientNumber).get().orElseGet(this.getStreamAppendClient).
FYI you could also do this with null if you don't care to use Optional here.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -357,28 +389,33 @@ void flush(RetryManager<AppendRowsResponse,
Context<AppendRowsResponse>> retryMa
private transient @Nullable DatasetService datasetService;
private int numPendingRecords = 0;
private int numPendingRecordBytes = 0;
- private static final int FLUSH_THRESHOLD_RECORDS = 150000;
private final int flushThresholdBytes;
+ private final int flushThresholdCount;
private final StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations;
private final BigQueryServices bqServices;
private final boolean useDefaultStream;
+ // default append client count to 1
+ private Integer streamAppendClientCount = 1;
Review Comment:
why not private int?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -263,9 +295,9 @@ void invalidateWriteStream() {
// thread has already invalidated
// and recreated the stream).
@Nullable
- StreamAppendClient cachedAppendClient =
APPEND_CLIENTS.getIfPresent(streamName);
- if (cachedAppendClient != null
- && System.identityHashCode(cachedAppendClient)
+ List<StreamAppendClient> cachedAppendClients =
APPEND_CLIENTS.getIfPresent(streamName);
+ if (cachedAppendClients != null
+ &&
System.identityHashCode(cachedAppendClients.get(clientNumber))
== System.identityHashCode(streamAppendClient)) {
Review Comment:
This isn't quite right - we're now invalidating all of the StreamWriters
when any one of them fails. I think instead you want to just null out the one
that failed and allow it to be recreated the next get.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]