Hey Alexey, Thanks a lot for your quick response. This worked for me :). Awesome.
Regards Mohil On Tue, Jun 2, 2020 at 6:31 AM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Hi Mohil, > > In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will > need to provide a PCollection<ProducerRecord<K, V>> as an input collection > where you set a desired output topic for every record inside ProducerRecord > metadata. > It could look something like this: > > PCollection<KV<Teamname, data>> teams = ...; > > PCollection<ProducerRecord<Teamname, data >> records = teams.apply(ParDo. > of(new KV2ProducerRecord()); > > private static class KV2ProducerRecord > extends DoFn<KV<Teamname, data>, ProducerRecord<Teamname, data>> { > // create ProducerRecord and set your topic there > ... > } > > records.apply(KafkaIO.<Teamname, data>writeRecords() > .withBootstrapServers("broker_1:9092,broker_2:9092") > .withTopic("results”) // default sink topic > .withKeySerializer(...) > .withValueSerializer(...)); > > > > On 2 Jun 2020, at 03:27, Mohil Khare <mo...@prosimo.io> wrote: > > Hello everyone, > > Does anyone know if it is possible to provide a topic name embedded in a > PCollection object to kafkaIO while writing ? > > We have a use case where we have a team specific kafka topic for eg > teamA_topicname, teamB_topicname. > > From beam, we create PCollection<KV<Teamname, data>> and we need to send > this data to kafka over aforementioned team specific topics. > Is it possible to provide topic names dynamically to > kafkaIO.write().withTopic() from Key present in KV PCollection ? > > Thanks and regards > Mohil > > > > > >