NP, I’m happy to hear that it works for you!
> On 3 Jun 2020, at 01:37, Mohil Khare <mo...@prosimo.io> wrote: > > Hey Alexey, > > Thanks a lot for your quick response. This worked for me :). Awesome. > > Regards > Mohil > > > On Tue, Jun 2, 2020 at 6:31 AM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > Hi Mohil, > > In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will need > to provide a PCollection<ProducerRecord<K, V>> as an input collection where > you set a desired output topic for every record inside ProducerRecord > metadata. > It could look something like this: > > PCollection<KV<Teamname, data>> teams = ...; > > PCollection<ProducerRecord<Teamname, data >> records = > teams.apply(ParDo.of(new KV2ProducerRecord()); > > private static class KV2ProducerRecord > extends DoFn<KV<Teamname, data>, ProducerRecord<Teamname, data>> { > // create ProducerRecord and set your topic there > ... > } > > records.apply(KafkaIO.<Teamname, data>writeRecords() > .withBootstrapServers("broker_1:9092,broker_2:9092") > .withTopic("results”) // default sink topic > .withKeySerializer(...) > .withValueSerializer(...)); > > > >> On 2 Jun 2020, at 03:27, Mohil Khare <mo...@prosimo.io >> <mailto:mo...@prosimo.io>> wrote: >> >> Hello everyone, >> >> Does anyone know if it is possible to provide a topic name embedded in a >> PCollection object to kafkaIO while writing ? >> >> We have a use case where we have a team specific kafka topic for eg >> teamA_topicname, teamB_topicname. >> >> From beam, we create PCollection<KV<Teamname, data>> and we need to send >> this data to kafka over aforementioned team specific topics. >> Is it possible to provide topic names dynamically to >> kafkaIO.write().withTopic() from Key present in KV PCollection ? >> >> Thanks and regards >> Mohil >> >> >> >> >