TheNeuralBit commented on code in PR #17159: URL: https://github.com/apache/beam/pull/17159#discussion_r876239026
########## sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py: ########## @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing +import unittest + +import numpy as np + +from apache_beam.io.gcp import bigquery_schema_tools +from apache_beam.io.gcp.bigquery_test import HttpError +from apache_beam.io.gcp.internal.clients import bigquery + + [email protected](HttpError is None, 'GCP dependencies are not installed') +class TestBigQueryToSchema(unittest.TestCase): + def test_produce_pcoll_with_schema(self): + fields = [ + bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"), + bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"), + bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.produce_pcoll_with_schema( + the_table_schema=schema) + self.assertEqual( + usertype.__annotations__, + { + 'stn': typing.Optional[str], + 'temp': typing.Sequence[np.float64], + 'count': np.int64 + }) + + def test_produce_pcoll_with_empty_schema(self): + fields = [] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.produce_pcoll_with_schema( + the_table_schema=schema) + self.assertEqual(usertype.__annotations__, {}) + + def test_error_at_runtime(self): + fields = [ + bigquery.TableFieldSchema( + name='number', type='DOUBLE', mode="NULLABLE"), + bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"), + bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None") + ] + schema = bigquery.TableSchema(fields=fields) + with self.assertRaises(ValueError): Review Comment: Could you use `assertRaisesRegex` and make sure the message matches the one you've defined ########## sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py: ########## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tools used tool work with Schema types in the context of BigQuery. +Classes, constants and functions in this file are experimental and have no +backwards compatibility guarantees. +NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. +""" + +from typing import Optional +from typing import Sequence + +import numpy as np + +import apache_beam as beam +from apache_beam.io.gcp.internal.clients import bigquery + +# BigQuery types as listed in +# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types +# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in +# https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String- +BIG_QUERY_TO_PYTHON_TYPES = { + "STRING": str, + "INTEGER": np.int64, + "FLOAT64": np.float64, + "BOOLEAN": bool, + "BYTES": bytes, + "TIMESTAMP": beam.utils.timestamp.Timestamp, + #TODO svetaksundhar@: Finish mappings for all BQ types +} + + +def produce_pcoll_with_schema(the_table_schema): + #type: (bigquery.TableSchema) -> type + + """Convert a schema of type TableSchema into a pcollection element. + Args: + the_table_schema: A BQ schema of type TableSchema + Returns: + type: type that can be used to work with pCollections. + """ + + the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema( + the_table_schema) + if the_schema == {}: + raise ValueError("The schema is empty") + dict_of_tuples = [] + for i in range(len(the_schema['fields'])): + if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES: + typ = bq_field_to_type( + the_schema['fields'][i]['type'], the_schema['fields'][i]['mode']) + else: + raise ValueError(the_schema['fields'][i]['type']) + # TODO svetaksundhar@: Map remaining BQ types + dict_of_tuples.append((the_schema['fields'][i]['name'], typ)) + sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples) + usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema) + return usertype + + +def produce_pcoll_using_bqio(project_id, dataset_id, table_id): + the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper( + ).get_table(project_id, dataset_id, table_id) + beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema) + + +def bq_field_to_type(field, mode): + if mode == 'NULLABLE': + return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]] + elif mode == 'REPEATED': + return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]] + elif mode == 'None' or mode == '': + return BIG_QUERY_TO_PYTHON_TYPES[field] + else: + return ValueError("Not a supported mode") Review Comment: Please make this reference the unsupported mode ```suggestion return ValueError(f"Encountered an unsupported mode: {mode!r}") ``` ########## sdks/python/apache_beam/io/gcp/bigquery.py: ########## @@ -2422,6 +2423,9 @@ class ReadFromBigQuery(PTransform): to run queries with INTERACTIVE priority. This option is ignored when reading from a table rather than a query. To learn more about query priority, see: https://cloud.google.com/bigquery/docs/running-queries + output_type (str): By default, the schema returned from this transform + would be of type TableSchema. Other schema types can be specified + ("BEAM_SCHEMAS"). Review Comment: This argument should be referring to the type of the elements in the output PCollection. The status quo (what should become the default) is to produce Python `dict`s, let's call this `"PYTHON_DICT"`, and make that default rather than `None`. For this new feature, I think we should call it `"BEAM_ROW"` to standardize with other terminology across Beam (a *Row* is an instance/element, whose type is described by a *Schema*) ########## sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py: ########## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tools used tool work with Schema types in the context of BigQuery. +Classes, constants and functions in this file are experimental and have no +backwards compatibility guarantees. +NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. +""" + +from typing import Optional +from typing import Sequence + +import numpy as np + +import apache_beam as beam +from apache_beam.io.gcp.internal.clients import bigquery + +# BigQuery types as listed in +# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types +# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in +# https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String- +BIG_QUERY_TO_PYTHON_TYPES = { + "STRING": str, + "INTEGER": np.int64, + "FLOAT64": np.float64, + "BOOLEAN": bool, + "BYTES": bytes, + "TIMESTAMP": beam.utils.timestamp.Timestamp, + #TODO svetaksundhar@: Finish mappings for all BQ types +} + + +def produce_pcoll_with_schema(the_table_schema): + #type: (bigquery.TableSchema) -> type + + """Convert a schema of type TableSchema into a pcollection element. + Args: + the_table_schema: A BQ schema of type TableSchema + Returns: + type: type that can be used to work with pCollections. + """ + + the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema( + the_table_schema) + if the_schema == {}: + raise ValueError("The schema is empty") + dict_of_tuples = [] + for i in range(len(the_schema['fields'])): + if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES: + typ = bq_field_to_type( + the_schema['fields'][i]['type'], the_schema['fields'][i]['mode']) + else: + raise ValueError(the_schema['fields'][i]['type']) + # TODO svetaksundhar@: Map remaining BQ types + dict_of_tuples.append((the_schema['fields'][i]['name'], typ)) + sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples) + usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema) + return usertype + + +def produce_pcoll_using_bqio(project_id, dataset_id, table_id): + the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper( + ).get_table(project_id, dataset_id, table_id) + beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema) + + +def bq_field_to_type(field, mode): + if mode == 'NULLABLE': + return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]] + elif mode == 'REPEATED': + return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]] + elif mode == 'None' or mode == '': + return BIG_QUERY_TO_PYTHON_TYPES[field] + else: + return ValueError("Not a supported mode") + + +class BeamSchemaUnbatchDoFn(beam.DoFn): + def __init__(self, pcoll_val_ctor): + self._pcoll_val_ctor = pcoll_val_ctor + + def infer_output_type(self, input_type): + return self._pcoll_val_ctor + + @classmethod + def _from_serialized_schema(cls, dict_of_tuples): + return cls( + beam.typehints.schemas.named_tuple_from_schema( + beam.dataframe.schemas.proto_utils.parse_Bytes( Review Comment: Oh I think this is re-importing the one that was imported in `beam.dataframe.schemas`? and does the same thing below for schema_pb2. Please update this to reference the underlying packages directly rather than going through `beam.dataframe` ########## sdks/python/apache_beam/io/gcp/bigquery.py: ########## @@ -2434,10 +2438,12 @@ def __init__( gcs_location=None, method=None, use_native_datetime=False, + output_type=None, *args, **kwargs): self.method = method or ReadFromBigQuery.Method.EXPORT self.use_native_datetime = use_native_datetime + self.output_type = output_type Review Comment: It looks like this isn't actually used anywhere. We should inspect this variable when the transform is expanded, and if it is `"BEAM_ROW"`, we should use your new utilities to add a ParDo at the end that converts the dictionaries to the generated user type. Does that make sense? ########## sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py: ########## @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing +import unittest + +import numpy as np + +from apache_beam.io.gcp import bigquery_schema_tools +from apache_beam.io.gcp.bigquery_test import HttpError +from apache_beam.io.gcp.internal.clients import bigquery + + [email protected](HttpError is None, 'GCP dependencies are not installed') +class TestBigQueryToSchema(unittest.TestCase): + def test_produce_pcoll_with_schema(self): + fields = [ + bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"), + bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"), + bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.produce_pcoll_with_schema( + the_table_schema=schema) + self.assertEqual( + usertype.__annotations__, + { + 'stn': typing.Optional[str], + 'temp': typing.Sequence[np.float64], + 'count': np.int64 + }) + + def test_produce_pcoll_with_empty_schema(self): + fields = [] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.produce_pcoll_with_schema( + the_table_schema=schema) + self.assertEqual(usertype.__annotations__, {}) + + def test_error_at_runtime(self): + fields = [ + bigquery.TableFieldSchema( + name='number', type='DOUBLE', mode="NULLABLE"), + bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"), + bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None") + ] + schema = bigquery.TableSchema(fields=fields) + with self.assertRaises(ValueError): + bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema=schema) Review Comment: Would you be able to test this through the public API instead (or in addition to this one)? Something like: ``` with self.assertRaises: ReadFromBigQuery(..., output_type=schema) ``` This would require mocking the bigquery client so it returns the bad schema. I thought I could find lots of examples of this but there don't actually seem to be any... the best I can find is this: https://github.com/apache/beam/blob/da4fcad9200862863e6de98b53ef4b1a2154b711/sdks/python/apache_beam/io/gcp/bigquery_test.py#L468-L472 I think it should be possible to use a pattern like that but there might be some gotchas I'm missing. ########## sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py: ########## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tools used tool work with Schema types in the context of BigQuery. +Classes, constants and functions in this file are experimental and have no +backwards compatibility guarantees. +NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. +""" + +from typing import Optional +from typing import Sequence + +import numpy as np + +import apache_beam as beam +from apache_beam.io.gcp.internal.clients import bigquery + +# BigQuery types as listed in +# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types +# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in +# https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String- +BIG_QUERY_TO_PYTHON_TYPES = { + "STRING": str, + "INTEGER": np.int64, + "FLOAT64": np.float64, + "BOOLEAN": bool, + "BYTES": bytes, + "TIMESTAMP": beam.utils.timestamp.Timestamp, + #TODO svetaksundhar@: Finish mappings for all BQ types +} + + +def produce_pcoll_with_schema(the_table_schema): + #type: (bigquery.TableSchema) -> type + + """Convert a schema of type TableSchema into a pcollection element. + Args: + the_table_schema: A BQ schema of type TableSchema + Returns: + type: type that can be used to work with pCollections. + """ + + the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema( + the_table_schema) + if the_schema == {}: + raise ValueError("The schema is empty") + dict_of_tuples = [] + for i in range(len(the_schema['fields'])): + if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES: + typ = bq_field_to_type( + the_schema['fields'][i]['type'], the_schema['fields'][i]['mode']) + else: + raise ValueError(the_schema['fields'][i]['type']) + # TODO svetaksundhar@: Map remaining BQ types + dict_of_tuples.append((the_schema['fields'][i]['name'], typ)) + sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples) + usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema) + return usertype + + +def produce_pcoll_using_bqio(project_id, dataset_id, table_id): + the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper( + ).get_table(project_id, dataset_id, table_id) + beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema) + + +def bq_field_to_type(field, mode): + if mode == 'NULLABLE': + return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]] + elif mode == 'REPEATED': + return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]] + elif mode == 'None' or mode == '': + return BIG_QUERY_TO_PYTHON_TYPES[field] + else: + return ValueError("Not a supported mode") + + +class BeamSchemaUnbatchDoFn(beam.DoFn): + def __init__(self, pcoll_val_ctor): + self._pcoll_val_ctor = pcoll_val_ctor + + def infer_output_type(self, input_type): + return self._pcoll_val_ctor + + @classmethod + def _from_serialized_schema(cls, dict_of_tuples): + return cls( + beam.typehints.schemas.named_tuple_from_schema( + beam.dataframe.schemas.proto_utils.parse_Bytes( Review Comment: This is the wrong package name for this function, it lives here: https://github.com/apache/beam/blob/da4fcad9200862863e6de98b53ef4b1a2154b711/sdks/python/apache_beam/utils/proto_utils.py#L101 (`beam.utils.proto_utils`) ########## sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py: ########## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tools used tool work with Schema types in the context of BigQuery. +Classes, constants and functions in this file are experimental and have no +backwards compatibility guarantees. +NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. +""" + +from typing import Optional +from typing import Sequence + +import numpy as np + +import apache_beam as beam +from apache_beam.io.gcp.internal.clients import bigquery + +# BigQuery types as listed in +# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types +# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in +# https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#setType-java.lang.String- +BIG_QUERY_TO_PYTHON_TYPES = { + "STRING": str, + "INTEGER": np.int64, + "FLOAT64": np.float64, + "BOOLEAN": bool, + "BYTES": bytes, + "TIMESTAMP": beam.utils.timestamp.Timestamp, + #TODO svetaksundhar@: Finish mappings for all BQ types +} + + +def produce_pcoll_with_schema(the_table_schema): + #type: (bigquery.TableSchema) -> type + + """Convert a schema of type TableSchema into a pcollection element. + Args: + the_table_schema: A BQ schema of type TableSchema + Returns: + type: type that can be used to work with pCollections. + """ + + the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema( + the_table_schema) + if the_schema == {}: + raise ValueError("The schema is empty") + dict_of_tuples = [] + for i in range(len(the_schema['fields'])): + if the_schema['fields'][i]['type'] in BIG_QUERY_TO_PYTHON_TYPES: + typ = bq_field_to_type( + the_schema['fields'][i]['type'], the_schema['fields'][i]['mode']) + else: + raise ValueError(the_schema['fields'][i]['type']) + # TODO svetaksundhar@: Map remaining BQ types + dict_of_tuples.append((the_schema['fields'][i]['name'], typ)) + sample_schema = beam.typehints.schemas.named_fields_to_schema(dict_of_tuples) + usertype = beam.typehints.schemas.named_tuple_from_schema(sample_schema) + return usertype + + +def produce_pcoll_using_bqio(project_id, dataset_id, table_id): + the_table_schema = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper( + ).get_table(project_id, dataset_id, table_id) + beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(the_table_schema) + + +def bq_field_to_type(field, mode): + if mode == 'NULLABLE': + return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]] + elif mode == 'REPEATED': + return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]] + elif mode == 'None' or mode == '': + return BIG_QUERY_TO_PYTHON_TYPES[field] + else: + return ValueError("Not a supported mode") + + +class BeamSchemaUnbatchDoFn(beam.DoFn): Review Comment: I think this DoFn should have a `process` method that converts a dictionary input to an instance of `pcoll_val_ctor` (what you were doing in the lambda before). Also a small nit: this shouldn't be doing any unbatching, so I'd just call it `ConvertToBeamSChemaDoFn` (or similar). ########## sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py: ########## @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing +import unittest + +import numpy as np + +from apache_beam.io.gcp import bigquery_schema_tools +from apache_beam.io.gcp.bigquery_test import HttpError +from apache_beam.io.gcp.internal.clients import bigquery + + [email protected](HttpError is None, 'GCP dependencies are not installed') +class TestBigQueryToSchema(unittest.TestCase): + def test_produce_pcoll_with_schema(self): + fields = [ + bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"), + bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"), + bigquery.TableFieldSchema(name='count', type='INTEGER', mode="None") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.produce_pcoll_with_schema( + the_table_schema=schema) + self.assertEqual( + usertype.__annotations__, + { + 'stn': typing.Optional[str], + 'temp': typing.Sequence[np.float64], + 'count': np.int64 + }) + + def test_produce_pcoll_with_empty_schema(self): + fields = [] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.produce_pcoll_with_schema( + the_table_schema=schema) + self.assertEqual(usertype.__annotations__, {}) + + def test_error_at_runtime(self): Review Comment: Please also add a test for an unsupported type, and make sure that that raises a helpful error (I think right now it will just get a `KeyError` when accessing `BIG_QUERY_TO_PYTHON_TYPES`) ########## sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py: ########## @@ -178,6 +178,30 @@ def test_iobase_source(self): query=query, use_standard_sql=True, project=self.project)) assert_that(result, equal_to(self.TABLE_DATA)) + @pytest.mark.it_postcommit + def test_table_schema_retrieve(self): + the_table = beam.io.gcp.bigquery.bigquery_tools.BigQueryWrapper().get_table( + project_id="apache-beam-testing", + dataset_id="beam_bigquery_io_test", + table_id="dfsqltable_3c7d6fd5_16e0460dfd0") + table = the_table.schema + utype = beam.io.gcp.bigquery_schema_tools.produce_pcoll_with_schema(table) + with beam.Pipeline(argv=self.args) as p: + result = ( + p | beam.io.gcp.bigquery.ReadFromBigQuery( + gcs_location="gs://bqio_schema", + table="beam_bigquery_io_test.dfsqltable_3c7d6fd5_16e0460dfd0", + project="apache-beam-testing") + | beam.io.gcp.bigquery.ReadFromBigQuery.get_pcoll_from_schema(table)) Review Comment: When the previous comment is addressed, we should be able to test this like so: ```suggestion p | beam.io.gcp.bigquery.ReadFromBigQuery( gcs_location="gs://bqio_schema", table="beam_bigquery_io_test.dfsqltable_3c7d6fd5_16e0460dfd0", project="apache-beam-testing", output_type="BEAM_ROW") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
