Hi,

I have a pipeline with a Pcollection of dicts in Python, and I'd like to apply 
a schema to it for use with SQLTransforms.

The schema is defined as follows:

class RowSchema(typing.NamedTuple):

    colA: str

    colB: typing.Optional[str]



beam.coders.registry.register_coder(RowSchema, beam.coders.RowCoder)


The code that ingests the Pcollection of dicts and attempts to apply the schema 
is:

pcol = (p

    | 'read from BQ' >>

     beam.io.ReadFromBigQuery(

      gcs_location="gs://example_location",

      query=query, #Reads only the columns defined in the schema

      use_standard_sql=True)
  | 'ToRow' >> beam.Map(
    lambda x: RowSchema(**x)).with_output_types(RowSchema)

  # | SqlTransform(...)


However, it results in the following error:

File "/home/lib/python3.9/site-packages/apache_beam/coders/coders.py", line 
423, in encode

    return value.encode('utf-8')

AttributeError: 'int' object has no attribute 'encode' [while running 'ToRow']


I've tested that if I use a Pcollection of beam.pvalue.Row, such as the 
following, the code does in fact work:

pcol = (p

    | "Create" >> beam.Create(

        [{'colA': 'a1', 'colB': 'b1'}, {'colA': 'a2', 'colB': None}])

    | 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)

    # | SqlTransform(...)


What can I do to apply the schema and enable SQLTransforms on a Pcollection of 
dicts?

The structure I tried to use is based on the following references:

  *   https://beam.apache.org/documentation/programming-guide/#inferring-schemas
  *   
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.sql.html
I've also checked the io.gcp.bigquery reference 
(https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html).
 I've noticed it has a schema implementation but only for writes to BigQuery, 
so I wasn't able to avoid the input as a Pcollection of dicts.
I also found this example 
(https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py)
 using a dynamic schema, which wouldn't be a valid approach for my use case as 
far as I understand it.

Any help with this issue would be greatly appreciated. Thanks!
**************************************************************** Este e-mail e 
seus anexos s?o para uso exclusivo do destinat?rio e podem conter informa??es 
confidenciais e/ou legalmente privilegiadas. N?o podem ser parcial ou 
totalmente reproduzidos sem o consentimento do autor. Qualquer divulga??o ou 
uso n?o autorizado deste e-mail ou seus anexos ? proibida. Se voc? receber esse 
e-mail por engano, por favor, notifique o remetente e apague-o imediatamente. 
This e-mail and its attachments are for the sole use of the addressee and may 
contain information which is confidential and/or legally privileged. Should not 
be partly or wholly reproduced without consent of the owner. Any unauthorized 
use of disclosure of this e-mail or its attachments is prohibited. If you 
receive this e-mail in error, please immediately delete it and notify the 
sender by return e-mail. 
*****************************************************************

Reply via email to