[ https://issues.apache.org/jira/browse/BEAM-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-8452: ---------------------------------- Labels: stale-assigned (was: ) > TriggerLoadJobs.process in bigquery_file_loads schema is type str > ----------------------------------------------------------------- > > Key: BEAM-8452 > URL: https://issues.apache.org/jira/browse/BEAM-8452 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Affects Versions: 2.15.0, 2.16.0 > Reporter: Noah Goodrich > Assignee: Noah Goodrich > Priority: P2 > Labels: stale-assigned > Time Spent: 2h 20m > Remaining Estimate: 0h > > I've found a first issue with the BigQueryFileLoads Transform and the type > of the schema parameter. > {code:java} > Triggering job > beam_load_2019_10_11_140829_19_157670e4d458f0ff578fbe971a91b30a_1570802915 to > load data to BigQuery table <TableReference > datasetId: 'pyr_monat_dev' > projectId: 'icentris-ml-dev' > tableId: 'tree_user_types'>.Schema: {"fields": [{"name": "id", "type": > "INTEGER", "mode": "required"}, {"name": "description", "type": "STRING", > "mode": "nullable"}]}. Additional parameters: {} > Retry with exponential backoff: waiting for 4.875033410381894 seconds before > retrying _insert_load_job because we caught exception: > apitools.base.protorpclite.messages.ValidationError: Expected type <clas > s > 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableSchema'> > for field schema, found {"fields": [{"name": "id", "type": "INTEGER", > "mode": "required"}, {"name": "description", "type" > : "STRING", "mode": "nullable"}]} (type <class 'str'>) > Traceback for above exception (most recent call last): > File "/opt/conda/lib/python3.7/site-packages/apache_beam/utils/retry.py", > line 206, in wrapper > return fun(*args, **kwargs) > File > "/opt/conda/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", > line 344, in _insert_load_job > **additional_load_parameters > File > "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", > line 791, in __init__ > setattr(self, name, value) > File > "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", > line 973, in __setattr__ > object.__setattr__(self, name, value) > File > "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", > line 1652, in __set__ > super(MessageField, self).__set__(message_instance, value) > File > "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", > line 1293, in __set__ > value = self.validate(value) > File > "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", > line 1400, in validate > return self.__validate(value, self.validate_element) > File > "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", > line 1358, in __validate > return validate_element(value) > File > "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", > line 1340, in validate_element > (self.type, name, value, type(value))) > > {code} > > The triggering code looks like this: > > options.view_as(DebugOptions).experiments = ['use_beam_bq_sink'] > # Save main session state so pickled functions and classes > # defined in __main__ can be unpickled > options.view_as(SetupOptions).save_main_session = True > custom_options = options.view_as(LoadSqlToBqOptions) > with beam.Pipeline(options=options) as p: > (p > | "Initializing with empty collection" >> beam.Create([1]) > | "Reading records from CloudSql" >> > beam.ParDo(ReadFromRelationalDBFn( > username=custom_options.user, > password=custom_options.password, > database=custom_options.database, > table=custom_options.table, > key_field=custom_options.key_field, > batch_size=custom_options.batch_size)) > | "Converting Row Object for BigQuery" >> > beam.ParDo(BuildForBigQueryFn(custom_options.bq_schema)) > | "Writing to BigQuery" >> beam.io.WriteToBigQuery( > table=custom_options.bq_table, > schema=custom_options.bq_schema, > > write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, > > create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)