Re: KafkaIO.write dynamically fanout to different kafka topics

2020-05-14 Thread Eleanore Jin
Hi Alex,

Thanks a lot for the information!

Best
Eleanore

On Thu, May 14, 2020 at 1:53 AM Alexey Romanenko 
wrote:

> Hi Eleanore,
>
> Yes, to define output topic dynamically for every record, you may want to
> use KafkaIO.writeRecords() that takes PCollection as
> an input and for every processed ProducerRecord it takes its topic name (if
> it was specified there), and use it as an output topic. So, in this way,
> you can set for every record a topic name where it will be published.
>
> If I got your question right, you need to have an intermediate PTransfrom
> before KafkaIO.writeRecords(), that will use an information from a message
> field to define a topic where your record should be published, and then
> create a new ProducerRecord with a proper topic name.
>
> > On 14 May 2020, at 07:09, Eleanore Jin  wrote:
> >
> > Hi all,
> >
> > I have a beam pipeline, which will read from kafka topic via KafkaIO,
> and based on the message field, add additional field in the message for the
> destination topic.
> >
> > I see KakfaIO.write can be used to publish to kafka topics.
> >
> > In KafkaIO.java, it construct the ProducerRecord, and getTopic()
> determines which topic to publish, and this information is passed when
> create PTransforms via KafkaIO.write.
> >
> > Any suggestions to dynamically set kafka topic from message field?
> >
> > Thanks a lot!
> > Eleanore
>
>


Re: KafkaIO.write dynamically fanout to different kafka topics

2020-05-14 Thread Alexey Romanenko
Hi Eleanore,

Yes, to define output topic dynamically for every record, you may want to use 
KafkaIO.writeRecords() that takes PCollection as an input 
and for every processed ProducerRecord it takes its topic name (if it was 
specified there), and use it as an output topic. So, in this way, you can set 
for every record a topic name where it will be published. 

If I got your question right, you need to have an intermediate PTransfrom 
before KafkaIO.writeRecords(), that will use an information from a message 
field to define a topic where your record should be published, and then create 
a new ProducerRecord with a proper topic name.

> On 14 May 2020, at 07:09, Eleanore Jin  wrote:
> 
> Hi all, 
> 
> I have a beam pipeline, which will read from kafka topic via KafkaIO, and 
> based on the message field, add additional field in the message for the 
> destination topic. 
> 
> I see KakfaIO.write can be used to publish to kafka topics. 
> 
> In KafkaIO.java, it construct the ProducerRecord, and getTopic() determines 
> which topic to publish, and this information is passed when create 
> PTransforms via KafkaIO.write.
> 
> Any suggestions to dynamically set kafka topic from message field? 
> 
> Thanks a lot!
> Eleanore



KafkaIO.write dynamically fanout to different kafka topics

2020-05-13 Thread Eleanore Jin
Hi all,

I have a beam pipeline, which will read from kafka topic via KafkaIO, and
based on the message field, add additional field in the message for the
destination topic.

I see KakfaIO.write can be used to publish to kafka topics.

In KafkaIO.java, it construct the ProducerRecord, and getTopic()
determines which topic to publish, and this information is passed when
create PTransforms via KafkaIO.write.

Any suggestions to dynamically set kafka topic from message field?

Thanks a lot!
Eleanore