Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 948735130 -> 6e5971e1f


[BEAM-2595] Allow table schema objects in BQ DoFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ada4733b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ada4733b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ada4733b

Branch: refs/heads/release-2.1.0
Commit: ada4733b02bc38b1ef619fb991c068822a917595
Parents: 9487351
Author: Sourabh Bajaj <sourabhba...@google.com>
Authored: Thu Jul 13 12:02:31 2017 -0700
Committer: Sourabh Bajaj <sourabhba...@google.com>
Committed: Fri Jul 14 15:31:59 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/bigquery.py      | 100 +++++++++++++++---
 sdks/python/apache_beam/io/gcp/bigquery_test.py | 105 +++++++++++++++++--
 2 files changed, 180 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ada4733b/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index da8be68..23fd310 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1191,22 +1191,20 @@ class BigQueryWriteFn(DoFn):
 
   @staticmethod
   def get_table_schema(schema):
-    # Transform the table schema into a bigquery.TableSchema instance.
-    if isinstance(schema, basestring):
-      table_schema = bigquery.TableSchema()
-      schema_list = [s.strip() for s in schema.split(',')]
-      for field_and_type in schema_list:
-        field_name, field_type = field_and_type.split(':')
-        field_schema = bigquery.TableFieldSchema()
-        field_schema.name = field_name
-        field_schema.type = field_type
-        field_schema.mode = 'NULLABLE'
-        table_schema.fields.append(field_schema)
-      return table_schema
-    elif schema is None:
-      return schema
-    elif isinstance(schema, bigquery.TableSchema):
+    """Transform the table schema into a bigquery.TableSchema instance.
+
+    Args:
+      schema: The schema to be used if the BigQuery table to write has to be
+        created. This is a dictionary object created in the WriteToBigQuery
+        transform.
+    Returns:
+      table_schema: The schema to be used if the BigQuery table to write has
+         to be created but in the bigquery.TableSchema format.
+    """
+    if schema is None:
       return schema
+    elif isinstance(schema, dict):
+      return parse_table_schema_from_json(json.dumps(schema))
     else:
       raise TypeError('Unexpected schema argument: %s.' % schema)
 
@@ -1289,13 +1287,83 @@ class WriteToBigQuery(PTransform):
     self.batch_size = batch_size
     self.test_client = test_client
 
+  @staticmethod
+  def get_table_schema_from_string(schema):
+    """Transform the string table schema into a bigquery.TableSchema instance.
+
+    Args:
+      schema: The sting schema to be used if the BigQuery table to write has
+         to be created.
+    Returns:
+      table_schema: The schema to be used if the BigQuery table to write has
+         to be created but in the bigquery.TableSchema format.
+    """
+    table_schema = bigquery.TableSchema()
+    schema_list = [s.strip() for s in schema.split(',')]
+    for field_and_type in schema_list:
+      field_name, field_type = field_and_type.split(':')
+      field_schema = bigquery.TableFieldSchema()
+      field_schema.name = field_name
+      field_schema.type = field_type
+      field_schema.mode = 'NULLABLE'
+      table_schema.fields.append(field_schema)
+    return table_schema
+
+  @staticmethod
+  def table_schema_to_dict(table_schema):
+    """Create a dictionary representation of table schema for serialization
+    """
+    def get_table_field(field):
+      """Create a dictionary representation of a table field
+      """
+      result = {}
+      result['name'] = field.name
+      result['type'] = field.type
+      result['mode'] = getattr(field, 'mode', 'NULLABLE')
+      if hasattr(field, 'description') and field.description is not None:
+        result['description'] = field.description
+      if hasattr(field, 'fields') and field.fields:
+        result['fields'] = [get_table_field(f) for f in field.fields]
+      return result
+
+    if not isinstance(table_schema, bigquery.TableSchema):
+      raise ValueError("Table schema must be of the type bigquery.TableSchema")
+    schema = {'fields': []}
+    for field in table_schema.fields:
+      schema['fields'].append(get_table_field(field))
+    return schema
+
+  @staticmethod
+  def get_dict_table_schema(schema):
+    """Transform the table schema into a dictionary instance.
+
+    Args:
+      schema: The schema to be used if the BigQuery table to write has to be
+        created. This can either be a dict or string or in the TableSchema
+        format.
+    Returns:
+      table_schema: The schema to be used if the BigQuery table to write has
+         to be created but in the dictionary format.
+    """
+    if isinstance(schema, dict):
+      return schema
+    elif schema is None:
+      return schema
+    elif isinstance(schema, basestring):
+      table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
+      return WriteToBigQuery.table_schema_to_dict(table_schema)
+    elif isinstance(schema, bigquery.TableSchema):
+      return WriteToBigQuery.table_schema_to_dict(schema)
+    else:
+      raise TypeError('Unexpected schema argument: %s.' % schema)
+
   def expand(self, pcoll):
     bigquery_write_fn = BigQueryWriteFn(
         table_id=self.table_reference.tableId,
         dataset_id=self.table_reference.datasetId,
         project_id=self.table_reference.projectId,
         batch_size=self.batch_size,
-        schema=self.schema,
+        schema=self.get_dict_table_schema(self.schema),
         create_disposition=self.create_disposition,
         write_disposition=self.write_disposition,
         client=self.test_client)

http://git-wip-us.apache.org/repos/asf/beam/blob/ada4733b/sdks/python/apache_beam/io/gcp/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index b7f766b..14247ba 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -834,12 +834,15 @@ class WriteToBigQuery(unittest.TestCase):
             projectId='project_id', datasetId='dataset_id', 
tableId='table_id'))
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+    schema = {'fields': [
+        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
         table_id='table_id',
         dataset_id='dataset_id',
         project_id='project_id',
         batch_size=2,
-        schema='month:INTEGER',
+        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         client=client)
@@ -855,13 +858,15 @@ class WriteToBigQuery(unittest.TestCase):
             projectId='project_id', datasetId='dataset_id', 
tableId='table_id'))
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+    schema = {'fields': [
+        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
         table_id='table_id',
         dataset_id='dataset_id',
         project_id='project_id',
         batch_size=2,
-        schema='month:INTEGER',
+        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         client=client)
@@ -879,13 +884,15 @@ class WriteToBigQuery(unittest.TestCase):
         bigquery.TableDataInsertAllResponse(insertErrors=[])
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+    schema = {'fields': [
+        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
         table_id='table_id',
         dataset_id='dataset_id',
         project_id='project_id',
         batch_size=2,
-        schema='month:INTEGER',
+        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         client=client)
@@ -906,13 +913,15 @@ class WriteToBigQuery(unittest.TestCase):
         bigquery.TableDataInsertAllResponse(insertErrors=[]))
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+    schema = {'fields': [
+        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
         table_id='table_id',
         dataset_id='dataset_id',
         project_id='project_id',
         batch_size=2,
-        schema='month:INTEGER',
+        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         client=client)
@@ -933,13 +942,15 @@ class WriteToBigQuery(unittest.TestCase):
         bigquery.TableDataInsertAllResponse(insertErrors=[])
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+    schema = {'fields': [
+        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
         table_id='table_id',
         dataset_id='dataset_id',
         project_id='project_id',
         batch_size=2,
-        schema='month:INTEGER',
+        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         client=client)
@@ -964,13 +975,15 @@ class WriteToBigQuery(unittest.TestCase):
         bigquery.TableDataInsertAllResponse(insertErrors=[])
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+    schema = {'fields': [
+        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
         table_id='table_id',
         dataset_id='dataset_id',
         project_id='project_id',
         batch_size=2,
-        schema='month:INTEGER',
+        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         client=client)
@@ -984,17 +997,91 @@ class WriteToBigQuery(unittest.TestCase):
     # InsertRows not called in finish bundle as no records
     self.assertFalse(client.tabledata.InsertAll.called)
 
-  def test_simple_schema_parsing(self):
+  def test_noop_schema_parsing(self):
+    expected_table_schema = None
     table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
-        schema='s:STRING, n:INTEGER')
+        schema=None)
+    self.assertEqual(expected_table_schema, table_schema)
+
+  def test_dict_schema_parsing(self):
+    schema = {'fields': [
+        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
+        {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
+            {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
+    table_schema = 
beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(schema)
     string_field = bigquery.TableFieldSchema(
         name='s', type='STRING', mode='NULLABLE')
+    nested_field = bigquery.TableFieldSchema(
+        name='x', type='INTEGER', mode='NULLABLE')
     number_field = bigquery.TableFieldSchema(
         name='n', type='INTEGER', mode='NULLABLE')
+    record_field = bigquery.TableFieldSchema(
+        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
     expected_table_schema = bigquery.TableSchema(
-        fields=[string_field, number_field])
+        fields=[string_field, number_field, record_field])
     self.assertEqual(expected_table_schema, table_schema)
 
+  def test_string_schema_parsing(self):
+    schema = 's:STRING, n:INTEGER'
+    expected_dict_schema = {'fields': [
+        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+    dict_schema = (
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(expected_dict_schema, dict_schema)
+
+  def test_table_schema_parsing(self):
+    string_field = bigquery.TableFieldSchema(
+        name='s', type='STRING', mode='NULLABLE')
+    nested_field = bigquery.TableFieldSchema(
+        name='x', type='INTEGER', mode='NULLABLE')
+    number_field = bigquery.TableFieldSchema(
+        name='n', type='INTEGER', mode='NULLABLE')
+    record_field = bigquery.TableFieldSchema(
+        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
+    schema = bigquery.TableSchema(
+        fields=[string_field, number_field, record_field])
+    expected_dict_schema = {'fields': [
+        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
+        {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
+            {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
+    dict_schema = (
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(expected_dict_schema, dict_schema)
+
+  def test_table_schema_parsing_end_to_end(self):
+    string_field = bigquery.TableFieldSchema(
+        name='s', type='STRING', mode='NULLABLE')
+    nested_field = bigquery.TableFieldSchema(
+        name='x', type='INTEGER', mode='NULLABLE')
+    number_field = bigquery.TableFieldSchema(
+        name='n', type='INTEGER', mode='NULLABLE')
+    record_field = bigquery.TableFieldSchema(
+        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
+    schema = bigquery.TableSchema(
+        fields=[string_field, number_field, record_field])
+    table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(table_schema, schema)
+
+  def test_none_schema_parsing(self):
+    schema = None
+    expected_dict_schema = None
+    dict_schema = (
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(expected_dict_schema, dict_schema)
+
+  def test_noop_dict_schema_parsing(self):
+    schema = {'fields': [
+        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+    expected_dict_schema = schema
+    dict_schema = (
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(expected_dict_schema, dict_schema)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

Reply via email to