[ 
https://issues.apache.org/jira/browse/BEAM-3458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Burke updated BEAM-3458:
-------------------------------
    Description: 
beam.Create and beam.CreateList when used with complex types do not survive 
pipeline serialization and deserliazation such as when the values are being 
decoded on a remote runner.

Such an ability is useful for providing static data, or known at construction 
time data to the pipeline.

The following works as expected in the direct go runner, which doesn't 
serialize and deserialize the pipeline, but fails remotely. The pipeline 
typechecks correctly.


{code:go}
type wordCount struct {
  K string
  V int
}

func splitToKV(e wordCount) (string,int) {
  return e.K, e.V
}

p := beam.NewPipeline()
s := p.Root()
list := beam.CreateList(s,  []wordCount{{"a", 23},{"b", 42},{"c", 5}}}
kvs := beam.ParDo(s, splitToKV, list)
{code}

... rest of pipeline...

The pipeline will try to execute the splitToKV pardo, and will panic when 
trying to use the JSON decoded values. Specifically, the beam.Create generated 
createFn only has a field of []interface, which when used with the JSON 
unmarshaller, will use map[string]interface instead for each value (as per the 
godoc for encoding/json).

The reflect library will then panic when trying to conver these 
map[string]interface values to wordCount structs for the splitToKV function.

This sort of thing will occur whenever a structural DoFn uses interface{} types 
to persist values to runners, since the underlying type information is lost in 
the encoding done by serialize.go
However, the types are known at construction time, either directly, or by the 
type checker when using Universal types, so the true underlying type could be 
encoded, and then used in the decoding process before storing them in the 
dematerialized structural DoFn.

A user can currently work around this by manually JSON encoding their structs 
to strings, and manually decoding them in their pipeline, but would need 
specialized code for each type used this way.


  was:
beam.Create and beam.CreateList when used with complex types do not survive 
pipeline serialization and deserliazation such as when the values are being 
decoded on a remote runner.

Such an ability is useful for providing static data, or known at construction 
time data to the pipeline.

The following works as expected in the direct go runner, which doesn't 
serialize and deserialize the pipeline, but fails remotely. The pipeline 
typechecks correctly.

type wordCount struct {
  K string
  V int
}

func splitToKV(e wordCount) (string,int) {
  return e.K, e.V
}

p := beam.NewPipeline()
s := p.Root()
list := beam.CreateList(s,  []wordCount{{"a", 23},{"b", 42},{"c", 5}}}
kvs := beam.ParDo(s, splitToKV, list)
... rest of pipeline...

The pipeline will try to execute the splitToKV pardo, and will panic when 
trying to use the JSON decoded values. Specifically, the beam.Create generated 
createFn only has a field of []interface, which when used with the JSON 
unmarshaller, will use map[string]interface instead for each value (as per the 
godoc for encoding/json).

The reflect library will then panic when trying to conver these 
map[string]interface values to wordCount structs for the splitToKV function.

This sort of thing will occur whenever a structural DoFn uses interface{} types 
to persist values to runners, since the underlying type information is lost in 
the encoding done by serialize.go
However, the types are known at construction time, either directly, or by the 
type checker when using Universal types, so the true underlying type could be 
encoded, and then used in the decoding process before storing them in the 
dematerialized structural DoFn.

A user can currently work around this by manually JSON encoding their structs 
to strings, and manually decoding them in their pipeline, but would need 
specialized code for each type used this way.



> Go SDK beam.Create & beam.CreateList should support complex types
> -----------------------------------------------------------------
>
>                 Key: BEAM-3458
>                 URL: https://issues.apache.org/jira/browse/BEAM-3458
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>    Affects Versions: Not applicable
>            Reporter: Robert Burke
>            Assignee: Henning Rohde
>
> beam.Create and beam.CreateList when used with complex types do not survive 
> pipeline serialization and deserliazation such as when the values are being 
> decoded on a remote runner.
> Such an ability is useful for providing static data, or known at construction 
> time data to the pipeline.
> The following works as expected in the direct go runner, which doesn't 
> serialize and deserialize the pipeline, but fails remotely. The pipeline 
> typechecks correctly.
> {code:go}
> type wordCount struct {
>   K string
>   V int
> }
> func splitToKV(e wordCount) (string,int) {
>   return e.K, e.V
> }
> p := beam.NewPipeline()
> s := p.Root()
> list := beam.CreateList(s,  []wordCount{{"a", 23},{"b", 42},{"c", 5}}}
> kvs := beam.ParDo(s, splitToKV, list)
> {code}
> ... rest of pipeline...
> The pipeline will try to execute the splitToKV pardo, and will panic when 
> trying to use the JSON decoded values. Specifically, the beam.Create 
> generated createFn only has a field of []interface, which when used with the 
> JSON unmarshaller, will use map[string]interface instead for each value (as 
> per the godoc for encoding/json).
> The reflect library will then panic when trying to conver these 
> map[string]interface values to wordCount structs for the splitToKV function.
> This sort of thing will occur whenever a structural DoFn uses interface{} 
> types to persist values to runners, since the underlying type information is 
> lost in the encoding done by serialize.go
> However, the types are known at construction time, either directly, or by the 
> type checker when using Universal types, so the true underlying type could be 
> encoded, and then used in the decoding process before storing them in the 
> dematerialized structural DoFn.
> A user can currently work around this by manually JSON encoding their structs 
> to strings, and manually decoding them in their pipeline, but would need 
> specialized code for each type used this way.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to