[ 
https://issues.apache.org/jira/browse/BEAM-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-8452:
----------------------------------
    Labels: stale-assigned  (was: )

> TriggerLoadJobs.process in bigquery_file_loads schema is type str
> -----------------------------------------------------------------
>
>                 Key: BEAM-8452
>                 URL: https://issues.apache.org/jira/browse/BEAM-8452
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.15.0, 2.16.0
>            Reporter: Noah Goodrich
>            Assignee: Noah Goodrich
>            Priority: P2
>              Labels: stale-assigned
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
>  I've found a first issue with the BigQueryFileLoads Transform and the type 
> of the schema parameter.
> {code:java}
> Triggering job 
> beam_load_2019_10_11_140829_19_157670e4d458f0ff578fbe971a91b30a_1570802915 to 
> load data to BigQuery table <TableReference
>  datasetId: 'pyr_monat_dev'
>  projectId: 'icentris-ml-dev'
>  tableId: 'tree_user_types'>.Schema: {"fields": [{"name": "id", "type": 
> "INTEGER", "mode": "required"}, {"name": "description", "type": "STRING", 
> "mode": "nullable"}]}. Additional parameters: {}
> Retry with exponential backoff: waiting for 4.875033410381894 seconds before 
> retrying _insert_load_job because we caught exception: 
> apitools.base.protorpclite.messages.ValidationError: Expected type <clas
> s 
> 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableSchema'>
>  for field schema, found {"fields": [{"name": "id", "type": "INTEGER", 
> "mode": "required"}, {"name": "description", "type"
> : "STRING", "mode": "nullable"}]} (type <class 'str'>)
>  Traceback for above exception (most recent call last):
>   File "/opt/conda/lib/python3.7/site-packages/apache_beam/utils/retry.py", 
> line 206, in wrapper
>     return fun(*args, **kwargs)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py",
>  line 344, in _insert_load_job
>     **additional_load_parameters
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 791, in __init__
>     setattr(self, name, value)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 973, in __setattr__
>     object.__setattr__(self, name, value)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1652, in __set__
>     super(MessageField, self).__set__(message_instance, value)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1293, in __set__
>     value = self.validate(value)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1400, in validate
>     return self.__validate(value, self.validate_element)
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1358, in __validate
>     return validate_element(value)   
>   File 
> "/opt/conda/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py",
>  line 1340, in validate_element
>     (self.type, name, value, type(value)))
>  
> {code}
>  
> The triggering code looks like this:
>  
> options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']
>         # Save main session state so pickled functions and classes
>         # defined in __main__ can be unpickled
>         options.view_as(SetupOptions).save_main_session = True
>         custom_options = options.view_as(LoadSqlToBqOptions)
>         with beam.Pipeline(options=options) as p:
>             (p
>                 | "Initializing with empty collection" >> beam.Create([1])
>                 | "Reading records from CloudSql" >> 
> beam.ParDo(ReadFromRelationalDBFn(
>                     username=custom_options.user,
>                     password=custom_options.password,
>                     database=custom_options.database,
>                     table=custom_options.table,
>                     key_field=custom_options.key_field,
>                     batch_size=custom_options.batch_size))
>                 | "Converting Row Object for BigQuery" >> 
> beam.ParDo(BuildForBigQueryFn(custom_options.bq_schema))
>                 | "Writing to BigQuery" >> beam.io.WriteToBigQuery(
>                         table=custom_options.bq_table,
>                         schema=custom_options.bq_schema,
>                         
> write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
>                         
> create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to