[ 
https://issues.apache.org/jira/browse/BEAM-1440?focusedWorklogId=349936&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-349936
 ]

ASF GitHub Bot logged work on BEAM-1440:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Nov/19 18:45
            Start Date: 26/Nov/19 18:45
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #9772: [BEAM-1440] 
Create a BigQuery source that implements iobase.BoundedSource for Python
URL: https://github.com/apache/beam/pull/9772#discussion_r350918239
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery.py
 ##########
 @@ -499,6 +509,189 @@ def reader(self, test_bigquery_client=None):
         kms_key=self.kms_key)
 
 
+FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')
+
+
+def _to_bool(value):
+  return value == 'true'
+
+
+def _to_decimal(value):
+  return decimal.Decimal(value)
+
+
+def _to_bytes(value):
+  """Converts value from str to bytes on Python 3.x. Does nothing on
+  Python 2.7."""
+  return value.encode('utf-8')
+
+
+class _JsonToDictCoder(coders.Coder):
+  """A coder for a JSON string to a Python dict."""
+
+  def __init__(self, table_schema):
+    self.fields = self._convert_to_tuple(table_schema.fields)
+    self._converters = {
+        'INTEGER': int,
+        'INT64': int,
+        'FLOAT': float,
+        'BOOLEAN': _to_bool,
+        'NUMERIC': _to_decimal,
+        'BYTES': _to_bytes,
+    }
+
+  @classmethod
+  def _convert_to_tuple(cls, table_field_schemas):
+    """Recursively converts the list of TableFieldSchema instances to the
+    list of tuples to prevent errors when pickling and unpickling
+    TableFieldSchema instances.
+    """
+    if not table_field_schemas:
+      return []
+
+    return [FieldSchema(cls._convert_to_tuple(x.fields), x.mode, x.name,
+                        x.type)
+            for x in table_field_schemas]
+
+  def decode(self, value):
+    value = json.loads(value)
+    return self._decode_with_schema(value, self.fields)
+
+  def _decode_with_schema(self, value, schema_fields):
+    for field in schema_fields:
+      if field.name not in value:
+        # The field exists in the schema, but it doesn't exist in this row.
+        # It probably means its value was null, as the extract to JSON job
+        # doesn't preserve null fields
+        value[field.name] = None
+        continue
+
+      if field.type == 'RECORD':
+        value[field.name] = self._decode_with_schema(value[field.name],
+                                                     field.fields)
+      else:
+        try:
+          converter = self._converters[field.type]
+          value[field.name] = converter(value[field.name])
+        except KeyError:
 
 Review comment:
   Does this mean that for other data types, we pass them as they are? e.g. for 
datetime data, or other like that?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 349936)
    Time Spent: 10h 40m  (was: 10.5h)

> Create a BigQuery source (that implements iobase.BoundedSource) for Python SDK
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-1440
>                 URL: https://issues.apache.org/jira/browse/BEAM-1440
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Kamil Wasilewski
>            Priority: Major
>          Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> Currently we have a BigQuery native source for Python SDK [1].
> This can only be used by Dataflow runner.
> We should  implement a Beam BigQuery source that implements 
> iobase.BoundedSource [2] interface so that other runners that try to use 
> Python SDK can read from BigQuery as well. Java SDK already has a Beam 
> BigQuery source [3].
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L70
> [3] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1189



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to