[ https://issues.apache.org/jira/browse/BEAM-6507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16757844#comment-16757844 ]
Robert Burke commented on BEAM-6507: ------------------------------------ Custom user types and functions must be registered at init time (somewhere before beam.Init() is called) to ensure compatibility/ serialization to the workers and similar. func init() { beam.RegisterType(reflect.TypeOf((*Ticket)(nil)).Elem()) beam.RegisterFunction(processXml) } The errors certainly need work. > Go SDK : Cannot use BigQuery compatible Structs in PCollections > --------------------------------------------------------------- > > Key: BEAM-6507 > URL: https://issues.apache.org/jira/browse/BEAM-6507 > Project: Beam > Issue Type: Bug > Components: sdk-go > Affects Versions: 2.8.0 > Environment: Ubuntu 18.04 (bionic), golang version 1.10.4, latest > beam sdk for Golang, Beam worker : Dataflow (GCP) > Reporter: Chloe Thonin > Assignee: Robert Burke > Priority: Blocker > Labels: beginner > > I want to create a PCollection of objects of this type : > > {noformat} > type Ticket struct { > Uid string `bigquery:"uid"` > ShopUid string `bigquery:"shop_uid"` > Zone TicketZone `bigquery:"zone"` > TicketType string `bigquery:"type_ticket"` > OperationType string `bigquery:"type_operation"` > DateTime time.Time `bigquery:"datetime"` > ProcessedAt time.Time `bigquery:"processed_at"` > Clients int `bigquery:"clients"` > Table *TicketTable `bigquery:"table,nullable"` > Date TicketDate `bigquery:"date"` > Time TicketTime `bigquery:"time"` > Total TicketTotal `bigquery:"total"` > Article []TicketArticle `bigquery:"article"` > Encaissement []TicketEncaissement `bigquery:"encaissement"` > } > {noformat} > > > {noformat} > type TicketTable struct { > Numero int `bigquery:"numero"` > SousNumero bigquery.NullInt64 `bigquery:"sous_numero"` > Couverts int `bigquery:"couverts"` > }{noformat} > > The process is to read raw XML data from GCP PubSub and process it to build > a PCollection of "Tickets" so they can be sent in bulk to BigQuery using > bigqueryio.Write() > > {code:java} > ticketsCol, tableCol := beam.ParDo2(s, processXml, windowedCol) > {code} > Our processXml function definition is : > > > {code:java} > func processXml( > input *pb.PubsubMessage, > gcs func(types.Document), > table func(types.Ticket)) (error) { > // ... > str := fmt.Sprintf("%s", input.Data) > doc, err := xmlquery.Parse(strings.NewReader(str)) > if err != nil { > panic(err) > fmt.Println(doc) > } > ticketTest := new(types.Ticket) > ticketTest.GetTicket(doc) > table(*ticketTest) > // ... > return nil > }{code} > > The code successfully build but panic at runtime with this error : > > {code:java} > panic: invalid DoFn: bad parameter type for main.processXml: > func(types.Ticket){code} > > We narrowed it down to InConcrete(t reflect.Type) in core/typex/class.go > (line 116) : the type is Struct but it contains a non concrete field > (*TicketTable) > Do you know if there is a workaround so we can build a PCollection of objects > with nullable fields from bigquery POV ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)