This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cead81f [BEAM-7530] Add it test to read None values from BigQuery (#8875) cead81f is described below commit cead81f4b0e4851c4e72ff6d48eeb27e4f0a5bcc Author: Juta Staes <juta.st...@gmail.com> AuthorDate: Wed Jul 17 20:49:17 2019 +0200 [BEAM-7530] Add it test to read None values from BigQuery (#8875) * [BEAM-7530] Add it test to read None values from BigQuery * fixup! merge tests * fixup:xup: reuse equal function from testing.util --- .../apache_beam/io/gcp/bigquery_read_it_test.py | 73 +++++++++++++++------- .../apache_beam/io/gcp/bigquery_write_it_test.py | 59 +++++++++++------ .../apache_beam/io/gcp/tests/bigquery_matcher.py | 8 ++- 3 files changed, 97 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index ad50b88..246d2ce 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -25,7 +25,9 @@ import logging import random import time import unittest +from decimal import Decimal +from future.utils import iteritems from nose.plugins.attrib import attr import apache_beam as beam @@ -103,6 +105,14 @@ class BigQueryReadIntegrationTests(unittest.TestCase): def create_table_new_types(self, table_name): table_schema = bigquery.TableSchema() table_field = bigquery.TableFieldSchema() + table_field.name = 'float' + table_field.type = 'FLOAT' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() + table_field.name = 'numeric' + table_field.type = 'NUMERIC' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() table_field.name = 'bytes' table_field.type = 'BYTES' table_schema.fields.append(table_field) @@ -114,6 +124,18 @@ class BigQueryReadIntegrationTests(unittest.TestCase): table_field.name = 'time' table_field.type = 'TIME' table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() + table_field.name = 'datetime' + table_field.type = 'DATETIME' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() + table_field.name = 'timestamp' + table_field.type = 'TIMESTAMP' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() + table_field.name = 'geo' + table_field.type = 'GEOGRAPHY' + table_schema.fields.append(table_field) table = bigquery.Table( tableReference=bigquery.TableReference( projectId=self.project, @@ -123,16 +145,18 @@ class BigQueryReadIntegrationTests(unittest.TestCase): request = bigquery.BigqueryTablesInsertRequest( projectId=self.project, datasetId=self.dataset_id, table=table) self.bigquery_client.client.tables.Insert(request) - table_data = [ - {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'}, - {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'}, - {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31', - 'time': '23:59:59'}, - {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'} - ] - # bigquery client expects base64 encoded bytes - for row in table_data: - row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8') + row_data = { + 'float': 0.33, 'numeric': Decimal('10'), 'bytes': + base64.b64encode(b'\xab\xac').decode('utf-8'), 'date': '3000-12-31', + 'time': '23:59:59', 'datetime': '2018-12-31T12:44:31', + 'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)' + } + + table_data = [row_data] + # add rows with only one key value pair and None values for all other keys + for key, value in iteritems(row_data): + table_data.append({key: value}) + self.bigquery_client.insert_rows( self.project, self.dataset_id, table_name, table_data) @@ -155,26 +179,31 @@ class BigQueryReadIntegrationTests(unittest.TestCase): @attr('IT') def test_big_query_read_new_types(self): - table_name = 'python_new_types_table' + table_name = 'python_new_types' self.create_table_new_types(table_name) table_id = '{}.{}'.format(self.dataset_id, table_name) args = self.test_pipeline.get_full_options_as_args() - expected_data = [ - {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'}, - {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'}, - {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31', - 'time': '23:59:59'}, - {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'} - ] - # bigquery io returns bytes as base64 encoded values - for row in expected_data: - row['bytes'] = base64.b64encode(row['bytes']) + expected_row = { + 'float': 0.33, 'numeric': Decimal('10'), 'bytes': + base64.b64encode(b'\xab\xac'), 'date': '3000-12-31', + 'time': '23:59:59', 'datetime': '2018-12-31T12:44:31', + 'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)' + } + + expected_data = [expected_row] + + # add rows with only one key value pair and None values for all other keys + for key, value in iteritems(expected_row): + row = {k: None for k in expected_row} + row[key] = value + expected_data.append(row) with beam.Pipeline(argv=args) as p: result = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource( - query='SELECT bytes, date, time FROM `%s`' % table_id, + query='SELECT float, numeric, bytes, date, time, datetime,' + 'timestamp, geo FROM `%s`' % table_id, use_standard_sql=True))) assert_that(result, equal_to(expected_data)) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index 2cb563e..3658b9c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -26,8 +26,11 @@ import logging import random import time import unittest +from decimal import Decimal import hamcrest as hc +import pytz +from future.utils import iteritems from nose.plugins.attrib import attr import apache_beam as beam @@ -168,34 +171,50 @@ class BigQueryWriteIntegrationTests(unittest.TestCase): table_name = 'python_new_types_table' table_id = '{}.{}'.format(self.dataset_id, table_name) - input_data = [ - {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'}, - {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'}, - {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31', - 'time': '23:59:59'}, - {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'} - ] - # bigquery io expects bytes to be base64 encoded values - for row in input_data: - row['bytes'] = base64.b64encode(row['bytes']) + row_data = { + 'float': 0.33, 'numeric': Decimal('10'), 'bytes': + base64.b64encode(b'\xab\xac').decode('utf-8'), 'date': '3000-12-31', + 'time': '23:59:59', 'datetime': '2018-12-31T12:44:31', + 'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)' + } + + input_data = [row_data] + # add rows with only one key value pair and None values for all other keys + for key, value in iteritems(row_data): + input_data.append({key: value}) table_schema = {"fields": [ + {"name": "float", "type": "FLOAT"}, + {"name": "numeric", "type": "NUMERIC"}, {"name": "bytes", "type": "BYTES"}, {"name": "date", "type": "DATE"}, - {"name": "time", "type": "TIME"}]} + {"name": "time", "type": "TIME"}, + {"name": "datetime", "type": "DATETIME"}, + {"name": "timestamp", "type": "TIMESTAMP"}, + {"name": "geo", "type": "GEOGRAPHY"} + ]} + + expected_row = (0.33, Decimal('10'), b'\xab\xac', + datetime.date(3000, 12, 31), datetime.time(23, 59, 59), + datetime.datetime(2018, 12, 31, 12, 44, 31), + datetime.datetime(2018, 12, 31, 12, 44, 31, 744957, + tzinfo=pytz.utc), 'POINT(30 10)', + ) + + expected_data = [expected_row] + + # add rows with only one key value pair and None values for all other keys + for i, value in enumerate(expected_row): + row = [None]*len(expected_row) + row[i] = value + expected_data.append(tuple(row)) pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, - query="SELECT bytes, date, time FROM %s" % table_id, - data=[(b'xyw', datetime.date(2011, 1, 1), - datetime.time(23, 59, 59, 999999), ), - (b'abc', datetime.date(2000, 1, 1), - datetime.time(0, 0, 0), ), - (b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31), - datetime.time(23, 59, 59), ), - (b'\xab\xac\xad', datetime.date(2000, 1, 1), - datetime.time(0, 0, 0), )])] + query='SELECT float, numeric, bytes, date, time, datetime,' + 'timestamp, geo FROM %s' % table_id, + data=expected_data)] args = self.test_pipeline.get_full_options_as_args( on_success_matcher=hc.all_of(*pipeline_verifiers)) diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py index 6709d6a..b981f14 100644 --- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py +++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py @@ -27,6 +27,8 @@ from hamcrest.core.base_matcher import BaseMatcher from apache_beam.io.gcp import bigquery_tools from apache_beam.testing.test_utils import compute_hash +from apache_beam.testing.util import BeamAssertException +from apache_beam.testing.util import equal_to from apache_beam.utils import retry __all__ = ['BigqueryMatcher', 'BigQueryTableMatcher'] @@ -145,7 +147,11 @@ class BigqueryFullResultMatcher(BaseMatcher): self.actual_data = response # Verify result - return sorted(self.expected_data) == sorted(self.actual_data) + try: + equal_to(self.expected_data)(self.actual_data) + return True + except BeamAssertException: + return False def _get_query_result(self, bigquery_client): return self._query_with_retry(bigquery_client)