damccorm commented on code in PR #17709:
URL: https://github.com/apache/beam/pull/17709#discussion_r878092046
##########
sdks/go/pkg/beam/io/pubsubio/pubsubio.go:
##########
@@ -73,15 +80,38 @@ func Read(s beam.Scope, project, topic string, opts
*ReadOptions) beam.PCollecti
return out[0]
}
-func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, error) {
+func unmarshalMessageFn(raw []byte, emit func(*pb.PubsubMessage)) error {
Review Comment:
This emitter should be registered
##########
sdks/go/pkg/beam/io/pubsubio/pubsubio.go:
##########
@@ -73,15 +80,38 @@ func Read(s beam.Scope, project, topic string, opts
*ReadOptions) beam.PCollecti
return out[0]
}
-func unmarshalMessageFn(raw []byte) (*pb.PubsubMessage, error) {
+func unmarshalMessageFn(raw []byte, emit func(*pb.PubsubMessage)) error {
var msg pb.PubsubMessage
if err := proto.Unmarshal(raw, &msg); err != nil {
- return nil, err
+ return err
}
- return &msg, nil
+ emit(&msg)
+ return nil
+}
+
+func wrapInMessage(raw []byte, emit func(*pb.PubsubMessage)) {
+ emit(&pb.PubsubMessage{
+ Data: raw,
+ })
}
-// Write writes PubSubMessages or bytes to the given pubsub topic.
+func marshalMessageFn(in *pb.PubsubMessage, emit func([]byte)) error {
Review Comment:
Same comment, this emitter should be registered
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]