yirutang commented on code in PR #24145:
URL: https://github.com/apache/beam/pull/24145#discussion_r1067489287


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -385,43 +399,68 @@ public void process(
 
       Supplier<String> getOrCreateStream =
           () -> getOrCreateStream(tableId, streamName, streamOffset, 
idleTimer, datasetService);
-      Function<Boolean, AppendClientInfo> getAppendClientInfo =
-          createAppendClient -> {
-            try {
-              @Nullable
-              TableSchema tableSchema =
-                  messageConverters
-                      .get(element.getKey().getKey(), dynamicDestinations, 
datasetService)
-                      .getTableSchema();
-              AppendClientInfo info =
-                  new AppendClientInfo(
-                      tableSchema,
-                      // Make sure that the client is always closed in a 
different thread to avoid
-                      // blocking.
-                      client ->
-                          runAsyncIgnoreFailure(
-                              closeWriterExecutor,
-                              () -> {
-                                // Remove the pin that is "owned" by the cache.
-                                client.unpin();
-                                client.close();
-                              }));
-              if (createAppendClient) {
-                info = info.createAppendClient(datasetService, 
getOrCreateStream, false);
-                // This pin is "owned" by the cache.
-                Preconditions.checkStateNotNull(info.streamAppendClient).pin();
-              }
-              return info;
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-          };
-
       AtomicReference<AppendClientInfo> appendClientInfo =
           new AtomicReference<>(
-              APPEND_CLIENTS.get(element.getKey(), () -> 
getAppendClientInfo.apply(true)));
+              APPEND_CLIENTS.get(
+                  element.getKey(),
+                  () -> {
+                    @Nullable TableSchema tableSchema;
+                    if (autoUpdateSchema && updatedSchema.read() != null) {
+                      // We've seen an updated schema, so we use that.
+                      tableSchema = updatedSchema.read();
+                    } else {
+                      // Start off with the base schema. As we get notified of 
schema updates, we
+                      // will update the
+                      // descriptor.
+                      tableSchema =
+                          messageConverters
+                              .get(element.getKey().getKey(), 
dynamicDestinations, datasetService)
+                              .getTableSchema();
+                    }
+                    AppendClientInfo info =
+                        AppendClientInfo.of(
+                                Preconditions.checkStateNotNull(tableSchema),
+                                // Make sure that the client is always closed 
in a different thread
+                                // to
+                                // avoid blocking.
+                                client ->
+                                    runAsyncIgnoreFailure(
+                                        closeWriterExecutor,
+                                        () -> {
+                                          // Remove the pin that is "owned" by 
the cache.
+                                          client.unpin();
+                                          client.close();
+                                        }))
+                            .withAppendClient(datasetService, 
getOrCreateStream, false);
+                    // This pin is "owned" by the cache.
+                    
Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin();
+                    return info;
+                  }));
+      TableSchema updatedSchemaValue = updatedSchema.read();

Review Comment:
   updatedSchema can be null?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -512,6 +581,18 @@ String retrieveErrorDetails(Iterable<AppendRowsContext> 
failedContext) {
             .map(StackTraceElement::toString)
             .collect(Collectors.joining("\n"));
       }
+
+      void postFlush() {
+        // If we got a response indicating an updated schema, recreate the 
client.
+        if (updatedTableSchema != null
+            && this.appendClientInfo != null
+            && this.appendClientInfo.hasSchemaChanged(updatedTableSchema)) {
+          invalidateWriteStream();
+          appendClientInfo =
+              Preconditions.checkStateNotNull(getAppendClientInfo(false, 
updatedTableSchema));
+          updatedTableSchema = null;

Review Comment:
   I am wondering if there will be races regarding this updatedSchema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to