Repository: beam Updated Branches: refs/heads/release-2.1.0 948735130 -> 6e5971e1f
[BEAM-2595] Allow table schema objects in BQ DoFn Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ada4733b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ada4733b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ada4733b Branch: refs/heads/release-2.1.0 Commit: ada4733b02bc38b1ef619fb991c068822a917595 Parents: 9487351 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Thu Jul 13 12:02:31 2017 -0700 Committer: Sourabh Bajaj <sourabhba...@google.com> Committed: Fri Jul 14 15:31:59 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/bigquery.py | 100 +++++++++++++++--- sdks/python/apache_beam/io/gcp/bigquery_test.py | 105 +++++++++++++++++-- 2 files changed, 180 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ada4733b/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index da8be68..23fd310 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1191,22 +1191,20 @@ class BigQueryWriteFn(DoFn): @staticmethod def get_table_schema(schema): - # Transform the table schema into a bigquery.TableSchema instance. - if isinstance(schema, basestring): - table_schema = bigquery.TableSchema() - schema_list = [s.strip() for s in schema.split(',')] - for field_and_type in schema_list: - field_name, field_type = field_and_type.split(':') - field_schema = bigquery.TableFieldSchema() - field_schema.name = field_name - field_schema.type = field_type - field_schema.mode = 'NULLABLE' - table_schema.fields.append(field_schema) - return table_schema - elif schema is None: - return schema - elif isinstance(schema, bigquery.TableSchema): + """Transform the table schema into a bigquery.TableSchema instance. + + Args: + schema: The schema to be used if the BigQuery table to write has to be + created. This is a dictionary object created in the WriteToBigQuery + transform. + Returns: + table_schema: The schema to be used if the BigQuery table to write has + to be created but in the bigquery.TableSchema format. + """ + if schema is None: return schema + elif isinstance(schema, dict): + return parse_table_schema_from_json(json.dumps(schema)) else: raise TypeError('Unexpected schema argument: %s.' % schema) @@ -1289,13 +1287,83 @@ class WriteToBigQuery(PTransform): self.batch_size = batch_size self.test_client = test_client + @staticmethod + def get_table_schema_from_string(schema): + """Transform the string table schema into a bigquery.TableSchema instance. + + Args: + schema: The sting schema to be used if the BigQuery table to write has + to be created. + Returns: + table_schema: The schema to be used if the BigQuery table to write has + to be created but in the bigquery.TableSchema format. + """ + table_schema = bigquery.TableSchema() + schema_list = [s.strip() for s in schema.split(',')] + for field_and_type in schema_list: + field_name, field_type = field_and_type.split(':') + field_schema = bigquery.TableFieldSchema() + field_schema.name = field_name + field_schema.type = field_type + field_schema.mode = 'NULLABLE' + table_schema.fields.append(field_schema) + return table_schema + + @staticmethod + def table_schema_to_dict(table_schema): + """Create a dictionary representation of table schema for serialization + """ + def get_table_field(field): + """Create a dictionary representation of a table field + """ + result = {} + result['name'] = field.name + result['type'] = field.type + result['mode'] = getattr(field, 'mode', 'NULLABLE') + if hasattr(field, 'description') and field.description is not None: + result['description'] = field.description + if hasattr(field, 'fields') and field.fields: + result['fields'] = [get_table_field(f) for f in field.fields] + return result + + if not isinstance(table_schema, bigquery.TableSchema): + raise ValueError("Table schema must be of the type bigquery.TableSchema") + schema = {'fields': []} + for field in table_schema.fields: + schema['fields'].append(get_table_field(field)) + return schema + + @staticmethod + def get_dict_table_schema(schema): + """Transform the table schema into a dictionary instance. + + Args: + schema: The schema to be used if the BigQuery table to write has to be + created. This can either be a dict or string or in the TableSchema + format. + Returns: + table_schema: The schema to be used if the BigQuery table to write has + to be created but in the dictionary format. + """ + if isinstance(schema, dict): + return schema + elif schema is None: + return schema + elif isinstance(schema, basestring): + table_schema = WriteToBigQuery.get_table_schema_from_string(schema) + return WriteToBigQuery.table_schema_to_dict(table_schema) + elif isinstance(schema, bigquery.TableSchema): + return WriteToBigQuery.table_schema_to_dict(schema) + else: + raise TypeError('Unexpected schema argument: %s.' % schema) + def expand(self, pcoll): bigquery_write_fn = BigQueryWriteFn( table_id=self.table_reference.tableId, dataset_id=self.table_reference.datasetId, project_id=self.table_reference.projectId, batch_size=self.batch_size, - schema=self.schema, + schema=self.get_dict_table_schema(self.schema), create_disposition=self.create_disposition, write_disposition=self.write_disposition, client=self.test_client) http://git-wip-us.apache.org/repos/asf/beam/blob/ada4733b/sdks/python/apache_beam/io/gcp/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index b7f766b..14247ba 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -834,12 +834,15 @@ class WriteToBigQuery(unittest.TestCase): projectId='project_id', datasetId='dataset_id', tableId='table_id')) create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + schema = {'fields': [ + {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} + fn = beam.io.gcp.bigquery.BigQueryWriteFn( table_id='table_id', dataset_id='dataset_id', project_id='project_id', batch_size=2, - schema='month:INTEGER', + schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, client=client) @@ -855,13 +858,15 @@ class WriteToBigQuery(unittest.TestCase): projectId='project_id', datasetId='dataset_id', tableId='table_id')) create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + schema = {'fields': [ + {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} fn = beam.io.gcp.bigquery.BigQueryWriteFn( table_id='table_id', dataset_id='dataset_id', project_id='project_id', batch_size=2, - schema='month:INTEGER', + schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, client=client) @@ -879,13 +884,15 @@ class WriteToBigQuery(unittest.TestCase): bigquery.TableDataInsertAllResponse(insertErrors=[]) create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + schema = {'fields': [ + {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} fn = beam.io.gcp.bigquery.BigQueryWriteFn( table_id='table_id', dataset_id='dataset_id', project_id='project_id', batch_size=2, - schema='month:INTEGER', + schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, client=client) @@ -906,13 +913,15 @@ class WriteToBigQuery(unittest.TestCase): bigquery.TableDataInsertAllResponse(insertErrors=[])) create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + schema = {'fields': [ + {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} fn = beam.io.gcp.bigquery.BigQueryWriteFn( table_id='table_id', dataset_id='dataset_id', project_id='project_id', batch_size=2, - schema='month:INTEGER', + schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, client=client) @@ -933,13 +942,15 @@ class WriteToBigQuery(unittest.TestCase): bigquery.TableDataInsertAllResponse(insertErrors=[]) create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + schema = {'fields': [ + {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} fn = beam.io.gcp.bigquery.BigQueryWriteFn( table_id='table_id', dataset_id='dataset_id', project_id='project_id', batch_size=2, - schema='month:INTEGER', + schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, client=client) @@ -964,13 +975,15 @@ class WriteToBigQuery(unittest.TestCase): bigquery.TableDataInsertAllResponse(insertErrors=[]) create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + schema = {'fields': [ + {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} fn = beam.io.gcp.bigquery.BigQueryWriteFn( table_id='table_id', dataset_id='dataset_id', project_id='project_id', batch_size=2, - schema='month:INTEGER', + schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, client=client) @@ -984,17 +997,91 @@ class WriteToBigQuery(unittest.TestCase): # InsertRows not called in finish bundle as no records self.assertFalse(client.tabledata.InsertAll.called) - def test_simple_schema_parsing(self): + def test_noop_schema_parsing(self): + expected_table_schema = None table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema( - schema='s:STRING, n:INTEGER') + schema=None) + self.assertEqual(expected_table_schema, table_schema) + + def test_dict_schema_parsing(self): + schema = {'fields': [ + {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [ + {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]} + table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(schema) string_field = bigquery.TableFieldSchema( name='s', type='STRING', mode='NULLABLE') + nested_field = bigquery.TableFieldSchema( + name='x', type='INTEGER', mode='NULLABLE') number_field = bigquery.TableFieldSchema( name='n', type='INTEGER', mode='NULLABLE') + record_field = bigquery.TableFieldSchema( + name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) expected_table_schema = bigquery.TableSchema( - fields=[string_field, number_field]) + fields=[string_field, number_field, record_field]) self.assertEqual(expected_table_schema, table_schema) + def test_string_schema_parsing(self): + schema = 's:STRING, n:INTEGER' + expected_dict_schema = {'fields': [ + {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} + dict_schema = ( + beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) + self.assertEqual(expected_dict_schema, dict_schema) + + def test_table_schema_parsing(self): + string_field = bigquery.TableFieldSchema( + name='s', type='STRING', mode='NULLABLE') + nested_field = bigquery.TableFieldSchema( + name='x', type='INTEGER', mode='NULLABLE') + number_field = bigquery.TableFieldSchema( + name='n', type='INTEGER', mode='NULLABLE') + record_field = bigquery.TableFieldSchema( + name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) + schema = bigquery.TableSchema( + fields=[string_field, number_field, record_field]) + expected_dict_schema = {'fields': [ + {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [ + {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]} + dict_schema = ( + beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) + self.assertEqual(expected_dict_schema, dict_schema) + + def test_table_schema_parsing_end_to_end(self): + string_field = bigquery.TableFieldSchema( + name='s', type='STRING', mode='NULLABLE') + nested_field = bigquery.TableFieldSchema( + name='x', type='INTEGER', mode='NULLABLE') + number_field = bigquery.TableFieldSchema( + name='n', type='INTEGER', mode='NULLABLE') + record_field = bigquery.TableFieldSchema( + name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) + schema = bigquery.TableSchema( + fields=[string_field, number_field, record_field]) + table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema( + beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) + self.assertEqual(table_schema, schema) + + def test_none_schema_parsing(self): + schema = None + expected_dict_schema = None + dict_schema = ( + beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) + self.assertEqual(expected_dict_schema, dict_schema) + + def test_noop_dict_schema_parsing(self): + schema = {'fields': [ + {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, + {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} + expected_dict_schema = schema + dict_schema = ( + beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) + self.assertEqual(expected_dict_schema, dict_schema) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)