[ 
https://issues.apache.org/jira/browse/BEAM-6507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16757844#comment-16757844
 ] 

Robert Burke commented on BEAM-6507:
------------------------------------

Custom user types and functions must be registered at init time (somewhere 
before beam.Init() is called) to ensure compatibility/ serialization to the 
workers and similar.

func init() {

beam.RegisterType(reflect.TypeOf((*Ticket)(nil)).Elem())
beam.RegisterFunction(processXml)

}



The errors certainly need work. 

> Go SDK : Cannot use BigQuery compatible Structs in PCollections
> ---------------------------------------------------------------
>
>                 Key: BEAM-6507
>                 URL: https://issues.apache.org/jira/browse/BEAM-6507
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>    Affects Versions: 2.8.0
>         Environment: Ubuntu 18.04 (bionic), golang version 1.10.4, latest 
> beam sdk for Golang, Beam worker : Dataflow (GCP)
>            Reporter: Chloe Thonin
>            Assignee: Robert Burke
>            Priority: Blocker
>              Labels: beginner
>
> I want to create a PCollection of objects of this type :
>  
> {noformat}
> type Ticket struct {
>   Uid string `bigquery:"uid"`
>   ShopUid string `bigquery:"shop_uid"`
>   Zone TicketZone `bigquery:"zone"`
>   TicketType string `bigquery:"type_ticket"`
>   OperationType string `bigquery:"type_operation"`
>   DateTime time.Time `bigquery:"datetime"`
>   ProcessedAt time.Time `bigquery:"processed_at"`
>   Clients int `bigquery:"clients"`
>   Table *TicketTable `bigquery:"table,nullable"`
>   Date TicketDate `bigquery:"date"`
>   Time TicketTime `bigquery:"time"`
>   Total TicketTotal `bigquery:"total"`
>   Article []TicketArticle `bigquery:"article"`
>   Encaissement []TicketEncaissement `bigquery:"encaissement"`
> }
> {noformat}
>  
>  
> {noformat}
> type TicketTable struct {
>   Numero int `bigquery:"numero"`
>   SousNumero bigquery.NullInt64 `bigquery:"sous_numero"`
>   Couverts int `bigquery:"couverts"`
> }{noformat}
>  
>  The process is to read raw XML data from GCP PubSub and process it to build 
> a PCollection of "Tickets" so they can be sent in bulk to BigQuery using 
> bigqueryio.Write()
>  
> {code:java}
> ticketsCol, tableCol := beam.ParDo2(s, processXml, windowedCol)
> {code}
> Our processXml function definition is :
>  
>  
> {code:java}
> func processXml(
>   input *pb.PubsubMessage, 
>   gcs func(types.Document), 
>   table func(types.Ticket)) (error) {
>   // ...
>   str := fmt.Sprintf("%s", input.Data)
>   doc, err := xmlquery.Parse(strings.NewReader(str))
>   if err != nil {
>      panic(err)
>      fmt.Println(doc)
>   }
>   ticketTest := new(types.Ticket)
>   ticketTest.GetTicket(doc)
>   table(*ticketTest)
>   // ...
>  return nil
> }{code}
>  
> The code successfully build but panic at runtime with this error :
>  
> {code:java}
> panic: invalid DoFn: bad parameter type for main.processXml: 
> func(types.Ticket){code}
>  
> We narrowed it down to InConcrete(t reflect.Type) in core/typex/class.go 
> (line 116) : the type is Struct but it contains a non concrete field 
> (*TicketTable)
> Do you know if there is a workaround so we can build a PCollection of objects 
> with nullable fields from bigquery POV ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to