NP, I’m happy to hear that it works for you!

> On 3 Jun 2020, at 01:37, Mohil Khare <> wrote:
> Hey Alexey,
> Thanks a lot for your quick response. This worked for me :). Awesome.
> Regards
> Mohil
> On Tue, Jun 2, 2020 at 6:31 AM Alexey Romanenko < 
> <>> wrote:
> Hi Mohil,
> In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will need 
> to provide a PCollection<ProducerRecord<K, V>> as an input collection where 
> you set a desired output topic for every record inside ProducerRecord 
> metadata.
> It could look something like this:
> PCollection<KV<Teamname, data>>  teams = ...;
> PCollection<ProducerRecord<Teamname, data >> records = 
> teams.apply(ParDo.of(new KV2ProducerRecord());
> private static class KV2ProducerRecord
>     extends DoFn<KV<Teamname, data>, ProducerRecord<Teamname, data>> {
>     // create ProducerRecord and set your topic there
>     ...
> }
> records.apply(KafkaIO.<Teamname, data>writeRecords()
>     .withBootstrapServers("broker_1:9092,broker_2:9092")
>     .withTopic("results”) // default sink topic 
>     .withKeySerializer(...)
>     .withValueSerializer(...));
>> On 2 Jun 2020, at 03:27, Mohil Khare < 
>> <>> wrote:
>> Hello everyone,
>> Does anyone know if  it is possible to provide a topic name embedded in a 
>> PCollection object to kafkaIO while writing ?
>> We have a use case where we have a team specific kafka topic for eg 
>> teamA_topicname, teamB_topicname.
>> From beam, we create PCollection<KV<Teamname, data>> and we need to send 
>> this data to kafka over aforementioned team specific topics.
>> Is it possible to provide topic names dynamically to 
>> kafkaIO.write().withTopic() from Key  present in KV PCollection ?
>> Thanks and regards
>> Mohil

Reply via email to