gemini-code-assist[bot] commented on code in PR #35576:
URL: https://github.com/apache/beam/pull/35576#discussion_r2237864019
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -635,38 +638,44 @@ public void process(
// Clear the stream name, forcing a new one to be created.
streamName.write("");
}
- appendClientInfo.set(
- appendClientInfo
- .get()
- .withAppendClient(
- writeStreamService,
- getOrCreateStream,
- false,
- defaultMissingValueInterpretation));
- StreamAppendClient streamAppendClient =
- Preconditions.checkArgumentNotNull(
- appendClientInfo.get().getStreamAppendClient());
- String streamNameRead =
Preconditions.checkArgumentNotNull(streamName.read());
- long currentOffset =
Preconditions.checkArgumentNotNull(streamOffset.read());
- for (AppendRowsContext context : contexts) {
- context.streamName = streamNameRead;
- streamAppendClient.pin();
- context.client =
appendClientInfo.get().getStreamAppendClient();
- context.offset = currentOffset;
- ++context.tryIteration;
- currentOffset = context.offset +
context.protoRows.getSerializedRowsCount();
+ // Synchronize to prevent race condition with clearClients
+ synchronized (APPEND_CLIENTS) {
+ appendClientInfo.set(
+ appendClientInfo
+ .get()
+ .withAppendClient(
+ writeStreamService,
+ getOrCreateStream,
+ false,
+ defaultMissingValueInterpretation));
+ StreamAppendClient streamAppendClient =
+ Preconditions.checkArgumentNotNull(
+ appendClientInfo.get().getStreamAppendClient());
+ String streamNameRead =
Preconditions.checkArgumentNotNull(streamName.read());
+ long currentOffset =
Preconditions.checkArgumentNotNull(streamOffset.read());
+ for (AppendRowsContext context : contexts) {
+ context.streamName = streamNameRead;
+ streamAppendClient.pin();
+ context.client =
appendClientInfo.get().getStreamAppendClient();
+ context.offset = currentOffset;
+ ++context.tryIteration;
+ currentOffset = context.offset +
context.protoRows.getSerializedRowsCount();
+ }
+ streamOffset.write(currentOffset);
}
Review Comment:

This `synchronized` block holds a lock on the static `APPEND_CLIENTS` object
while potentially performing a blocking I/O operation inside `withAppendClient`
(when a new client is created). This can become a significant performance
bottleneck, as it may serialize all threads trying to get an append client
across different shards.
To avoid this contention, consider refactoring to perform the I/O-bound
client creation outside of the synchronized block. The critical section should
ideally only protect the modification of shared state like `appendClientInfo`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]