Hello, and thanks for trying out the Go SDK! tl;dr; You can bypass the beam correctness checks by pretending your type is a Protocol Buffer. See create_test.go <https://inbox.google.com/create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52> for an example of what needs to be done. This is not recommended but it works.
To answer your questions: You're right, at present, the Go SDK is not happy with the Map type as an element value. As a rule, it tries to avoid having free-floating pointers and reference types (like interfaces{}, slices, and maps) and some will likely never be allowed (like channels). The reason is that it opens up the risk of modifying the value *after* it has been emitted, which leads to unpredictable bugs. It's at best, a feature that's not presently implemented, due to the mentioned danger. You are not doing anything wrong. AFAICT your pipeline is correct. There is a workaround, but we can't guarantee it is a viable long term solution, since it involves depending on a quirk of protocol buffers. Essentially, you can convince both beam and the protocol buffer package that your custom type is a Proto, and write your own Marshal and Ummarshal methods to convert it to a []byte (though anything that can do that, should work). See create_test.go <create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52> for an example of what needs to be done. When the Go SDK has a proper Coder Registry or similar, we might have a better solution, but the danger will still be there. Please let me know if you need help. Cheers, Robert Burke On 2018/06/18 21:10:33, ed...@gmail.com <e...@gmail.com> wrote: > I have the following code:> > > type Record struct {> > Timestamp time.Time> > Payload string> > }> > > type processFn struct {> > // etc...> > }> > > func (f *processFn) ProcessElement(ctx context.Context, data []byte, emit func(Record)) {> > // etc..> > emit(someRecord)> > // etc...> > }> > > Which is eventually invoked as:> > beam.ParDo(scope, &processFn{}, pcoll)> > > This seems to work fine using the direct runner until I add a map to the Record struct as follows:> > > type Record struct {> > Timestamp time.Time> > Payload string> > Labels map[string]string> > }> > > Then I get the error mentioned in the subject line.> > > My questions are:> > - Are maps illegal? What is a legal structure? or> > - Is the feature yet to be implemented? or> > - Am I doing something wrong? Am I failing to setup my pipeline correctly?> > > > Thanks for your help.> >