Hey Alexey,

Thanks a lot for your quick response. This worked for me :). Awesome.

Regards
Mohil


On Tue, Jun 2, 2020 at 6:31 AM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Hi Mohil,
>
> In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will
> need to provide a PCollection<ProducerRecord<K, V>> as an input collection
> where you set a desired output topic for every record inside ProducerRecord
> metadata.
> It could look something like this:
>
> PCollection<KV<Teamname, data>>  teams = ...;
>
> PCollection<ProducerRecord<Teamname, data >> records = teams.apply(ParDo.
> of(new KV2ProducerRecord());
>
> private static class KV2ProducerRecord
>     extends DoFn<KV<Teamname, data>, ProducerRecord<Teamname, data>> {
>     // create ProducerRecord and set your topic there
>     ...
> }
>
> records.apply(KafkaIO.<Teamname, data>writeRecords()
>     .withBootstrapServers("broker_1:9092,broker_2:9092")
>     .withTopic("results”) // default sink topic
>     .withKeySerializer(...)
>     .withValueSerializer(...));
>
>
>
> On 2 Jun 2020, at 03:27, Mohil Khare <mo...@prosimo.io> wrote:
>
> Hello everyone,
>
> Does anyone know if  it is possible to provide a topic name embedded in a
> PCollection object to kafkaIO while writing ?
>
> We have a use case where we have a team specific kafka topic for eg
> teamA_topicname, teamB_topicname.
>
> From beam, we create PCollection<KV<Teamname, data>> and we need to send
> this data to kafka over aforementioned team specific topics.
> Is it possible to provide topic names dynamically to
> kafkaIO.write().withTopic() from Key  present in KV PCollection ?
>
> Thanks and regards
> Mohil
>
>
>
>
>
>

Reply via email to