[ 
https://issues.apache.org/jira/browse/BEAM-7742?focusedWorklogId=298869&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298869
 ]

ASF GitHub Bot logged work on BEAM-7742:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Aug/19 17:15
            Start Date: 21/Aug/19 17:15
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #9242: [BEAM-7742] 
Partition files in BQFL to cater to quotas & limits
URL: https://github.com/apache/beam/pull/9242#discussion_r316291469
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
 ##########
 @@ -380,44 +389,93 @@ def process(self, element, load_job_name_prefix, 
*schema_side_inputs):
     else:
       additional_parameters = self.additional_bq_parameters
 
-    batch_of_files = list(itertools.islice(files, _MAXIMUM_SOURCE_URIS))
-    while batch_of_files:
-
-      table_reference = bigquery_tools.parse_table_reference(destination)
-      if table_reference.projectId is None:
-        table_reference.projectId = vp.RuntimeValueProvider.get_value(
-            'project', str, '')
-      # Load jobs for a single destination are always triggered from the same
-      # worker. This means that we can generate a deterministic numbered job 
id,
-      # and not need to worry.
-      destination_hash = _bq_uuid('%s:%s.%s' % (table_reference.projectId,
-                                                table_reference.datasetId,
-                                                table_reference.tableId))
-      timestamp = int(time.time())
-      job_name = '%s_%s_%s' % (
-          load_job_name_prefix, destination_hash, timestamp)
-      logging.debug('Batch of files has %s files. Job name is %s.',
-                    len(batch_of_files), job_name)
-
-      if self.temporary_tables:
-        # For temporary tables, we create a new table with the name with JobId.
-        table_reference.tableId = job_name
-        yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference)
-
-      logging.info('Triggering job %s to load data to BigQuery table %s.'
-                   'Schema: %s. Additional parameters: %s',
-                   job_name, table_reference,
-                   schema, additional_parameters)
-      job_reference = self.bq_wrapper.perform_load_job(
-          table_reference, batch_of_files, job_name,
-          schema=schema,
-          write_disposition=self.write_disposition,
-          create_disposition=self.create_disposition,
-          additional_load_parameters=additional_parameters)
-      yield (destination, job_reference)
-
-      # Prepare to trigger the next job
-      batch_of_files = list(itertools.islice(files, _MAXIMUM_SOURCE_URIS))
+    table_reference = bigquery_tools.parse_table_reference(destination)
+    if table_reference.projectId is None:
+      table_reference.projectId = vp.RuntimeValueProvider.get_value(
+          'project', str, '')
+    # Load jobs for a single destination are always triggered from the same
+    # worker. This means that we can generate a deterministic numbered job id,
+    # and not need to worry.
+    destination_hash = _bq_uuid('%s:%s.%s' % (table_reference.projectId,
+                                              table_reference.datasetId,
+                                              table_reference.tableId))
+    uid = _bq_uuid()
+    job_name = '%s_%s_%s' % (
+        load_job_name_prefix, destination_hash, uid)
+    logging.debug('Load job has %s files. Job name is %s.',
+                  len(files), job_name)
+
+    if self.temporary_tables:
+      # For temporary tables, we create a new table with the name with JobId.
+      table_reference.tableId = job_name
+      yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference)
+
+    logging.info('Triggering job %s to load data to BigQuery table %s.'
+                 'Schema: %s. Additional parameters: %s',
+                 job_name, table_reference,
+                 schema, additional_parameters)
+    job_reference = self.bq_wrapper.perform_load_job(
+        table_reference, files, job_name,
+        schema=schema,
+        write_disposition=self.write_disposition,
+        create_disposition=self.create_disposition,
+        additional_load_parameters=additional_parameters)
+    yield (destination, job_reference)
+
+
+class PartitionFiles(beam.DoFn):
+
+  MULTIPLE_PARTITIONS_TAG = 'MULTIPLE_PARTITIONS'
+  SINGLE_PARTITION_TAG = 'SINGLE_PARTITION'
+
+  class Partition(object):
+
+    def __init__(self, max_size, max_files, files=None, size=0):
+      self.max_size = max_size
+      self.max_files = max_files
+      self.files = files if files is not None else []
+      self.size = size
+
+    def can_accept(self, file_size, no_of_files=1):
+      if (((self.size + file_size) <= self.max_size)
+          and ((len(self.files) + no_of_files) <= self.max_files)):
+        return True
+      else:
+        return False
+
+    def add(self, file_path, file_size):
+      self.files.append(file_path)
+      self.size += file_size
+
+  def __init__(self, max_partition_size, max_files_per_partition):
+    self.max_partition_size = max_partition_size
+    self.max_files_per_partition = max_files_per_partition
+
+  def process(self, element):
+    destination = element[0]
+    files = element[1]
+    partitions = []
+
+    latest_partition = PartitionFiles.Partition(self.max_partition_size,
+                                                self.max_files_per_partition)
+
+    for file_path, file_size in files:
+      if latest_partition.can_accept(file_size):
+        latest_partition.add(file_path, file_size)
+      else:
+        partitions.append(latest_partition.files)
+        latest_partition = PartitionFiles.\
+          Partition(self.max_partition_size, self.max_files_per_partition)
 
 Review comment:
   Nit: for consistency with Beam code style
   ```
           latest_partition = PartitionFiles.Partition(
             self.max_partition_size, 
             self.max_files_per_partition)
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 298869)

> BigQuery File Loads to work well with load job size limits
> ----------------------------------------------------------
>
>                 Key: BEAM-7742
>                 URL: https://issues.apache.org/jira/browse/BEAM-7742
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-gcp
>            Reporter: Pablo Estrada
>            Assignee: Tanay Tummalapalli
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Load jobs into BigQuery have a number of limitations: 
> [https://cloud.google.com/bigquery/quotas#load_jobs]
>  
> Currently, the python BQ sink implemented in `bigquery_file_loads.py` does 
> not handle these limitations well. Improvements need to be made to the 
> miplementation, to:
>  * Decide to use temp_tables dynamically at pipeline execution
>  * Add code to determine when a load job to a single destination needs to be 
> partitioned into multiple jobs.
>  * When this happens, then we definitely need to use temp_tables, in case one 
> of the two load jobs fails, and the pipeline is rerun.
> Tanay, would you be able to look at this?



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to