[ 
https://issues.apache.org/jira/browse/BEAM-1440?focusedWorklogId=353687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353687
 ]

ASF GitHub Bot logged work on BEAM-1440:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Dec/19 19:28
            Start Date: 04/Dec/19 19:28
    Worklog Time Spent: 10m 
      Work Description: kamilwu commented on pull request #9772: [BEAM-1440] 
Create a BigQuery source that implements iobase.BoundedSource for Python
URL: https://github.com/apache/beam/pull/9772#discussion_r353239750
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery.py
 ##########
 @@ -499,6 +509,189 @@ def reader(self, test_bigquery_client=None):
         kms_key=self.kms_key)
 
 
+FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')
+
+
+def _to_bool(value):
+  return value == 'true'
+
+
+def _to_decimal(value):
+  return decimal.Decimal(value)
+
+
+def _to_bytes(value):
+  """Converts value from str to bytes on Python 3.x. Does nothing on
+  Python 2.7."""
+  return value.encode('utf-8')
+
+
+class _JsonToDictCoder(coders.Coder):
+  """A coder for a JSON string to a Python dict."""
+
+  def __init__(self, table_schema):
+    self.fields = self._convert_to_tuple(table_schema.fields)
+    self._converters = {
+        'INTEGER': int,
+        'INT64': int,
+        'FLOAT': float,
+        'BOOLEAN': _to_bool,
+        'NUMERIC': _to_decimal,
+        'BYTES': _to_bytes,
+    }
+
+  @classmethod
+  def _convert_to_tuple(cls, table_field_schemas):
+    """Recursively converts the list of TableFieldSchema instances to the
+    list of tuples to prevent errors when pickling and unpickling
+    TableFieldSchema instances.
+    """
+    if not table_field_schemas:
+      return []
+
+    return [FieldSchema(cls._convert_to_tuple(x.fields), x.mode, x.name,
+                        x.type)
+            for x in table_field_schemas]
+
+  def decode(self, value):
+    value = json.loads(value)
+    return self._decode_with_schema(value, self.fields)
+
+  def _decode_with_schema(self, value, schema_fields):
+    for field in schema_fields:
+      if field.name not in value:
+        # The field exists in the schema, but it doesn't exist in this row.
+        # It probably means its value was null, as the extract to JSON job
+        # doesn't preserve null fields
+        value[field.name] = None
+        continue
+
+      if field.type == 'RECORD':
+        value[field.name] = self._decode_with_schema(value[field.name],
+                                                     field.fields)
+      else:
+        try:
+          converter = self._converters[field.type]
+          value[field.name] = converter(value[field.name])
+        except KeyError:
+          # No need to do any conversion
+          pass
+    return value
+
+  def is_deterministic(self):
+    return True
+
+  def to_type_hint(self):
+    return dict
+
+
+class _BigQuerySource(BoundedSource):
+  def __init__(self, gcs_location=None, table=None, dataset=None,
+               project=None, query=None, validate=False, coder=None,
+               use_standard_sql=False, flatten_results=True, kms_key=None):
+    if table is not None and query is not None:
+      raise ValueError('Both a BigQuery table and a query were specified.'
+                       ' Please specify only one of these.')
+    elif table is None and query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+    elif table is not None:
+      self.table_reference = bigquery_tools.parse_table_reference(
+          table, dataset, project)
+      self.query = None
+      self.use_legacy_sql = True
+    else:
+      self.query = query
+      # TODO(BEAM-1082): Change the internal flag to be standard_sql
+      self.use_legacy_sql = not use_standard_sql
+      self.table_reference = None
+
+    self.gcs_location = gcs_location
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+
+  def estimate_size(self):
+    bq = bigquery_tools.BigQueryWrapper()
+    if self.table_reference is not None:
+      table = bq.get_table(self.table_reference.projectId,
+                           self.table_reference.datasetId,
+                           self.table_reference.tableId)
+      return int(table.numBytes)
+    else:
+      self._setup_temporary_dataset(bq)
+      job = bq._start_query_job(self.project, self.query,
+                                self.use_legacy_sql, self.flatten_results,
+                                job_id=uuid.uuid4().hex, dry_run=True,
+                                kms_key=self.kms_key)
+      size = int(job.statistics.totalBytesProcessed)
+
+      bq.clean_up_temporary_dataset(self.project)
+
+      return size
+
+  def split(self, desired_bundle_size, start_position=None, 
stop_position=None):
+    if self.split_result is None:
+      bq = bigquery_tools.BigQueryWrapper()
+
+      if self.query is not None:
+        self._setup_temporary_dataset(bq)
+        self.table_reference = self._execute_query(bq)
+
+      schema, metadata_list = self._export_files(bq)
+      self.split_result = [TextSource(metadata.path, 0,
+                                      CompressionTypes.UNCOMPRESSED, True,
+                                      self.coder(schema))
+                           for metadata in metadata_list]
+
+      if self.query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+    for source in self.split_result:
+      yield SourceBundle(0, source, None, None)
+
+  def get_range_tracker(self, start_position, stop_position):
+    raise NotImplementedError('BigQuery source must be split before being 
read')
+
+  def read(self, range_tracker):
+    raise NotImplementedError('BigQuery source must be split before being 
read')
+
+  def _setup_temporary_dataset(self, bq):
+    location = bq.get_query_location(self.project, self.query,
+                                     self.use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq):
+    job = bq._start_query_job(self.project, self.query,
+                              self.use_legacy_sql, self.flatten_results,
+                              job_id=uuid.uuid4().hex, kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq):
+    """Runs a BigQuery export job.
+
+    Returns:
+      bigquery.TableSchema instance, a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([self.gcs_location], job_id,
 
 Review comment:
   I found in the doc[1] something that may answer your question:
   ```
   Calling jobs.insert on a given job ID is idempotent; in other words, you can 
retry as many times as you like on the same job ID, and at most one of those 
operations will succeed.
   ```
   When retry is called, job ID is the same, so I think we are safe.
   
   [1] 
https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_stored_in
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 353687)
    Time Spent: 13.5h  (was: 13h 20m)

> Create a BigQuery source (that implements iobase.BoundedSource) for Python SDK
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-1440
>                 URL: https://issues.apache.org/jira/browse/BEAM-1440
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Kamil Wasilewski
>            Priority: Major
>          Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> Currently we have a BigQuery native source for Python SDK [1].
> This can only be used by Dataflow runner.
> We should  implement a Beam BigQuery source that implements 
> iobase.BoundedSource [2] interface so that other runners that try to use 
> Python SDK can read from BigQuery as well. Java SDK already has a Beam 
> BigQuery source [3].
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L70
> [3] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1189



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to