[
https://issues.apache.org/jira/browse/BEAM-1440?focusedWorklogId=339983&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-339983
]
ASF GitHub Bot logged work on BEAM-1440:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Nov/19 15:00
Start Date: 07/Nov/19 15:00
Worklog Time Spent: 10m
Work Description: kamilwu commented on pull request #9772: [BEAM-1440]
Create a BigQuery source that implements iobase.BoundedSource for Python
URL: https://github.com/apache/beam/pull/9772#discussion_r343697148
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -496,6 +506,189 @@ def reader(self, test_bigquery_client=None):
kms_key=self.kms_key)
+FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')
+
+
+def _to_bool(value):
+ return value == 'true'
+
+
+def _to_decimal(value):
+ return decimal.Decimal(value)
+
+
+def _to_bytes(value):
+ """Converts value from str to bytes on Python 3.x. Does nothing on
+ Python 2.7."""
+ return value.encode('utf-8')
+
+
+class _JsonToDictCoder(coders.Coder):
+ """A coder for a JSON string to a Python dict."""
+
+ def __init__(self, table_schema):
+ self.fields = self._convert_to_tuple(table_schema.fields)
+ self._converters = {
+ 'INTEGER': int,
+ 'INT64': int,
+ 'FLOAT': float,
+ 'BOOLEAN': _to_bool,
+ 'NUMERIC': _to_decimal,
+ 'BYTES': _to_bytes,
+ }
+
+ @classmethod
+ def _convert_to_tuple(cls, table_field_schemas):
+ """Recursively converts the list of TableFieldSchema instances to the
+ list of tuples to prevent errors when pickling and unpickling
Review comment:
This error was quite interesting. It seems that it's impossible to serialize
and deserialize nested `bigquery.TableFieldSchema` instances:
```
from apache_beam.internal import pickler
from apache_beam.io.gcp.internal.clients import bigquery
obj = bigquery.TableFieldSchema(fields=[bigquery.TableFieldSchema()])
pickler.loads(pickler.dumps(obj))
```
This snippet triggers the following exception:
`AttributeError: 'FieldList' object has no attribute '_FieldList__field'`.
My workaround was to rewrite all TableFieldSchema instances to an equivalent
tuple, which can be serialized and deserialized without problems.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 339983)
Time Spent: 9.5h (was: 9h 20m)
> Create a BigQuery source (that implements iobase.BoundedSource) for Python SDK
> ------------------------------------------------------------------------------
>
> Key: BEAM-1440
> URL: https://issues.apache.org/jira/browse/BEAM-1440
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Chamikara Madhusanka Jayalath
> Assignee: Kamil Wasilewski
> Priority: Major
> Time Spent: 9.5h
> Remaining Estimate: 0h
>
> Currently we have a BigQuery native source for Python SDK [1].
> This can only be used by Dataflow runner.
> We should implement a Beam BigQuery source that implements
> iobase.BoundedSource [2] interface so that other runners that try to use
> Python SDK can read from BigQuery as well. Java SDK already has a Beam
> BigQuery source [3].
> [1]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py
> [2]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L70
> [3]
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1189
--
This message was sent by Atlassian Jira
(v8.3.4#803005)