I don't think I can help with your specific issue, but I can point you to
some potentially useful code. +Alex Van Boxel <[email protected]> was
working on a very similar strategy and added a lot of code for mapping
protobufs to Beam schemas which you may be able to take advantage of. He
added options to Beam schemas [1], and the ability to map protobuf options
to schema options. He also added schema support for dynamic messages in [2].

Brian

[1]
https://cwiki.apache.org/confluence/display/BEAM/%5BBIP-1%5D+Beam+Schema+Options
[2] https://github.com/apache/beam/pull/10502

On Mon, Jun 15, 2020 at 1:15 AM amit kumar <[email protected]> wrote:

> Hi,
>
>
> I intend to use Protobuf options to trigger different transforms and use
> metadata from storage proto options for sink partitioning  etc.. and also
> allow different protobuf message types flowing via the same pipeline,
> running as different instances of the pipeline.
>
> I am able to parse descriptors, fields and options from file descriptors
> compiled externally to the beam pipeline jar.
>
>
> I am not able to use dynamicMessage.getDefaultInstanceForType() in the
> Sink transforms PTransform<PCollection<T>, PDone> which need a
> defaultInstance of the message type to persist the data since it throws
> com.google.protobuf.DynamicMessage not Serializable.
>
> I wanted to check if there is a way to use a generic proto in a beam
> pipeline and if there are any examples of protobuf reflection which can be
> used in this case or if there is any recommended way to achieve this
> functionality.
>
>
>
> Many Thanks,
>
> Amit
>

Reply via email to