ahmedabu98 commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r971106105


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   The benefit of the PendingJobManager's [wait 
implementation](https://github.com/apache/beam/blob/935eeadd45a6100c68243fc5b266b20d26164173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java#L127)
 is that it waits for the jobs in parallel and checks on each of them. This 
way, one slow job does not block us from checking (and logging) the state of 
the other jobs.
   
   This is lost in the Python connector's wait implementation because we check 
the state of the jobs sequentially. Ultimately, however, it doesn't change the 
big picture behavior because file loads are atomic, ie. if one job fails, all 
the jobs in the bundle fail. We don't release jobs to the next step until all 
the jobs are finished waiting. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to