[
https://issues.apache.org/jira/browse/BEAM-3458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Burke updated BEAM-3458:
-------------------------------
Description:
beam.Create and beam.CreateList when used with complex types do not survive
pipeline serialization and deserliazation such as when the values are being
decoded on a remote runner.
Such an ability is useful for providing static data, or known at construction
time data to the pipeline.
The following works as expected in the direct go runner, which doesn't
serialize and deserialize the pipeline, but fails remotely. The pipeline
typechecks correctly.
{code:go}
type wordCount struct {
K string
V int
}
func splitToKV(e wordCount) (string,int) {
return e.K, e.V
}
p := beam.NewPipeline()
s := p.Root()
list := beam.CreateList(s, []wordCount{{"a", 23},{"b", 42},{"c", 5}}}
kvs := beam.ParDo(s, splitToKV, list)
{code}
... rest of pipeline...
The pipeline will try to execute the splitToKV pardo, and will panic when
trying to use the JSON decoded values. Specifically, the beam.Create generated
createFn only has a field of []interface, which when used with the JSON
unmarshaller, will use map[string]interface instead for each value (as per the
godoc for encoding/json).
The reflect library will then panic when trying to conver these
map[string]interface values to wordCount structs for the splitToKV function.
This sort of thing will occur whenever a structural DoFn uses interface{} types
to persist values to runners, since the underlying type information is lost in
the encoding done by serialize.go
However, the types are known at construction time, either directly, or by the
type checker when using Universal types, so the true underlying type could be
encoded, and then used in the decoding process before storing them in the
dematerialized structural DoFn.
A user can currently work around this by manually JSON encoding their structs
to strings, and manually decoding them in their pipeline, but would need
specialized code for each type used this way.
was:
beam.Create and beam.CreateList when used with complex types do not survive
pipeline serialization and deserliazation such as when the values are being
decoded on a remote runner.
Such an ability is useful for providing static data, or known at construction
time data to the pipeline.
The following works as expected in the direct go runner, which doesn't
serialize and deserialize the pipeline, but fails remotely. The pipeline
typechecks correctly.
type wordCount struct {
K string
V int
}
func splitToKV(e wordCount) (string,int) {
return e.K, e.V
}
p := beam.NewPipeline()
s := p.Root()
list := beam.CreateList(s, []wordCount{{"a", 23},{"b", 42},{"c", 5}}}
kvs := beam.ParDo(s, splitToKV, list)
... rest of pipeline...
The pipeline will try to execute the splitToKV pardo, and will panic when
trying to use the JSON decoded values. Specifically, the beam.Create generated
createFn only has a field of []interface, which when used with the JSON
unmarshaller, will use map[string]interface instead for each value (as per the
godoc for encoding/json).
The reflect library will then panic when trying to conver these
map[string]interface values to wordCount structs for the splitToKV function.
This sort of thing will occur whenever a structural DoFn uses interface{} types
to persist values to runners, since the underlying type information is lost in
the encoding done by serialize.go
However, the types are known at construction time, either directly, or by the
type checker when using Universal types, so the true underlying type could be
encoded, and then used in the decoding process before storing them in the
dematerialized structural DoFn.
A user can currently work around this by manually JSON encoding their structs
to strings, and manually decoding them in their pipeline, but would need
specialized code for each type used this way.
> Go SDK beam.Create & beam.CreateList should support complex types
> -----------------------------------------------------------------
>
> Key: BEAM-3458
> URL: https://issues.apache.org/jira/browse/BEAM-3458
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Affects Versions: Not applicable
> Reporter: Robert Burke
> Assignee: Henning Rohde
>
> beam.Create and beam.CreateList when used with complex types do not survive
> pipeline serialization and deserliazation such as when the values are being
> decoded on a remote runner.
> Such an ability is useful for providing static data, or known at construction
> time data to the pipeline.
> The following works as expected in the direct go runner, which doesn't
> serialize and deserialize the pipeline, but fails remotely. The pipeline
> typechecks correctly.
> {code:go}
> type wordCount struct {
> K string
> V int
> }
> func splitToKV(e wordCount) (string,int) {
> return e.K, e.V
> }
> p := beam.NewPipeline()
> s := p.Root()
> list := beam.CreateList(s, []wordCount{{"a", 23},{"b", 42},{"c", 5}}}
> kvs := beam.ParDo(s, splitToKV, list)
> {code}
> ... rest of pipeline...
> The pipeline will try to execute the splitToKV pardo, and will panic when
> trying to use the JSON decoded values. Specifically, the beam.Create
> generated createFn only has a field of []interface, which when used with the
> JSON unmarshaller, will use map[string]interface instead for each value (as
> per the godoc for encoding/json).
> The reflect library will then panic when trying to conver these
> map[string]interface values to wordCount structs for the splitToKV function.
> This sort of thing will occur whenever a structural DoFn uses interface{}
> types to persist values to runners, since the underlying type information is
> lost in the encoding done by serialize.go
> However, the types are known at construction time, either directly, or by the
> type checker when using Universal types, so the true underlying type could be
> encoded, and then used in the decoding process before storing them in the
> dematerialized structural DoFn.
> A user can currently work around this by manually JSON encoding their structs
> to strings, and manually decoding them in their pipeline, but would need
> specialized code for each type used this way.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)