I don't think I can help with your specific issue, but I can point you to some potentially useful code. +Alex Van Boxel <[email protected]> was working on a very similar strategy and added a lot of code for mapping protobufs to Beam schemas which you may be able to take advantage of. He added options to Beam schemas [1], and the ability to map protobuf options to schema options. He also added schema support for dynamic messages in [2].
Brian [1] https://cwiki.apache.org/confluence/display/BEAM/%5BBIP-1%5D+Beam+Schema+Options [2] https://github.com/apache/beam/pull/10502 On Mon, Jun 15, 2020 at 1:15 AM amit kumar <[email protected]> wrote: > Hi, > > > I intend to use Protobuf options to trigger different transforms and use > metadata from storage proto options for sink partitioning etc.. and also > allow different protobuf message types flowing via the same pipeline, > running as different instances of the pipeline. > > I am able to parse descriptors, fields and options from file descriptors > compiled externally to the beam pipeline jar. > > > I am not able to use dynamicMessage.getDefaultInstanceForType() in the > Sink transforms PTransform<PCollection<T>, PDone> which need a > defaultInstance of the message type to persist the data since it throws > com.google.protobuf.DynamicMessage not Serializable. > > I wanted to check if there is a way to use a generic proto in a beam > pipeline and if there are any examples of protobuf reflection which can be > used in this case or if there is any recommended way to achieve this > functionality. > > > > Many Thanks, > > Amit >
