Thanks Brian for your response, Alex's code is very helpful. For my current
case I will use reflections to get default instance types of proto messages.
Another way I think would be to de-couple converters and sinks to bypass
this issue and do some of the conversions inside DoFn.

Regards,
Amit

On Mon, Jun 15, 2020 at 11:28 AM Brian Hulette <[email protected]> wrote:

> I don't think I can help with your specific issue, but I can point you to
> some potentially useful code. +Alex Van Boxel <[email protected]> was
> working on a very similar strategy and added a lot of code for mapping
> protobufs to Beam schemas which you may be able to take advantage of. He
> added options to Beam schemas [1], and the ability to map protobuf options
> to schema options. He also added schema support for dynamic messages in [2].
>
> Brian
>
> [1]
> https://cwiki.apache.org/confluence/display/BEAM/%5BBIP-1%5D+Beam+Schema+Options
> [2] https://github.com/apache/beam/pull/10502
>
> On Mon, Jun 15, 2020 at 1:15 AM amit kumar <[email protected]> wrote:
>
>> Hi,
>>
>>
>> I intend to use Protobuf options to trigger different transforms and use
>> metadata from storage proto options for sink partitioning  etc.. and
>> also allow different protobuf message types flowing via the same pipeline,
>> running as different instances of the pipeline.
>>
>> I am able to parse descriptors, fields and options from file descriptors
>> compiled externally to the beam pipeline jar.
>>
>>
>> I am not able to use dynamicMessage.getDefaultInstanceForType() in the
>> Sink transforms PTransform<PCollection<T>, PDone> which need a
>> defaultInstance of the message type to persist the data since it throws
>> com.google.protobuf.DynamicMessage not Serializable.
>>
>> I wanted to check if there is a way to use a generic proto in a beam
>> pipeline and if there are any examples of protobuf reflection which can be
>> used in this case or if there is any recommended way to achieve this
>> functionality.
>>
>>
>>
>> Many Thanks,
>>
>> Amit
>>
>

Reply via email to