NP, I’m happy to hear that it works for you!

> On 3 Jun 2020, at 01:37, Mohil Khare <mo...@prosimo.io> wrote:
> 
> Hey Alexey,
> 
> Thanks a lot for your quick response. This worked for me :). Awesome.
> 
> Regards
> Mohil
> 
> 
> On Tue, Jun 2, 2020 at 6:31 AM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> Hi Mohil,
> 
> In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will need 
> to provide a PCollection<ProducerRecord<K, V>> as an input collection where 
> you set a desired output topic for every record inside ProducerRecord 
> metadata.
> It could look something like this:
> 
> PCollection<KV<Teamname, data>>  teams = ...;
> 
> PCollection<ProducerRecord<Teamname, data >> records = 
> teams.apply(ParDo.of(new KV2ProducerRecord());
> 
> private static class KV2ProducerRecord
>     extends DoFn<KV<Teamname, data>, ProducerRecord<Teamname, data>> {
>     // create ProducerRecord and set your topic there
>     ...
> }
> 
> records.apply(KafkaIO.<Teamname, data>writeRecords()
>     .withBootstrapServers("broker_1:9092,broker_2:9092")
>     .withTopic("results”) // default sink topic 
>     .withKeySerializer(...)
>     .withValueSerializer(...));
> 
> 
> 
>> On 2 Jun 2020, at 03:27, Mohil Khare <mo...@prosimo.io 
>> <mailto:mo...@prosimo.io>> wrote:
>> 
>> Hello everyone,
>> 
>> Does anyone know if  it is possible to provide a topic name embedded in a 
>> PCollection object to kafkaIO while writing ?
>> 
>> We have a use case where we have a team specific kafka topic for eg 
>> teamA_topicname, teamB_topicname.
>> 
>> From beam, we create PCollection<KV<Teamname, data>> and we need to send 
>> this data to kafka over aforementioned team specific topics.
>> Is it possible to provide topic names dynamically to 
>> kafkaIO.write().withTopic() from Key  present in KV PCollection ?
>> 
>> Thanks and regards
>> Mohil
>> 
>> 
>> 
>> 
> 

Reply via email to