Hello, and thanks for trying out the Go SDK!

tl;dr; You can bypass the beam correctness checks by pretending your type
is a Protocol Buffer. See create_test.go
<https://inbox.google.com/create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52>
for
an example of what needs to be done. This is not recommended but it works.

To answer your questions:
You're right, at present, the Go SDK is not happy with the Map type as an
element value. As a rule, it tries to avoid having free-floating pointers
and reference types (like interfaces{}, slices, and maps) and some will
likely never be allowed (like channels). The reason is that it opens up the
risk of modifying the value *after* it has been emitted, which leads to
unpredictable bugs.

It's at best, a feature that's not presently implemented, due to the
mentioned danger.

You are not doing anything wrong. AFAICT your pipeline is correct.

There is a workaround, but we can't guarantee it is a viable long term
solution, since it involves depending on a quirk of protocol buffers.

Essentially, you can convince both beam and the protocol buffer package
that your custom type is a Proto, and write your own Marshal and Ummarshal
methods to convert it to a []byte (though anything that can do that, should
work).

See create_test.go
<create_test.gohttps://github.com/apache/beam/blob/master/sdks/go/pkg/beam/create_test.go#L52>
for an example of what needs to be done.

When the Go SDK has a proper Coder Registry or similar, we might have a
better solution, but the danger will still be there.

Please let me know if you need help.

Cheers,
Robert Burke

On 2018/06/18 21:10:33, ed...@gmail.com <e...@gmail.com> wrote:
> I have the following code:>
>
> type Record struct {>
>        Timestamp time.Time>
>        Payload   string>
> }>
>
> type processFn struct {>
>         // etc...>
> }>
>
> func (f *processFn) ProcessElement(ctx context.Context, data []byte, emit
func(Record)) {>
>          // etc..>
>          emit(someRecord)>
>          // etc...>
> }>
>
> Which is eventually invoked as:>
> beam.ParDo(scope, &processFn{}, pcoll)>
>
> This seems to work fine using the direct runner until I add a map to the
Record struct as follows:>
>
> type Record struct {>
>         Timestamp    time.Time>
>         Payload      string>
>         Labels       map[string]string>
> }>
>
> Then I get the error mentioned in the subject line.>
>
> My questions are:>
> - Are maps illegal? What is a legal structure? or>
> - Is the feature yet to be implemented? or>
> - Am I doing something wrong? Am I failing to setup my pipeline
correctly?>
> >
> Thanks for your help.>
>

Reply via email to