Re: KafkaIO.write dynamically fanout to different kafka topics
Hi Alex, Thanks a lot for the information! Best Eleanore On Thu, May 14, 2020 at 1:53 AM Alexey Romanenko wrote: > Hi Eleanore, > > Yes, to define output topic dynamically for every record, you may want to > use KafkaIO.writeRecords() that takes PCollection as > an input and for every processed ProducerRecord it takes its topic name (if > it was specified there), and use it as an output topic. So, in this way, > you can set for every record a topic name where it will be published. > > If I got your question right, you need to have an intermediate PTransfrom > before KafkaIO.writeRecords(), that will use an information from a message > field to define a topic where your record should be published, and then > create a new ProducerRecord with a proper topic name. > > > On 14 May 2020, at 07:09, Eleanore Jin wrote: > > > > Hi all, > > > > I have a beam pipeline, which will read from kafka topic via KafkaIO, > and based on the message field, add additional field in the message for the > destination topic. > > > > I see KakfaIO.write can be used to publish to kafka topics. > > > > In KafkaIO.java, it construct the ProducerRecord, and getTopic() > determines which topic to publish, and this information is passed when > create PTransforms via KafkaIO.write. > > > > Any suggestions to dynamically set kafka topic from message field? > > > > Thanks a lot! > > Eleanore > >
Re: KafkaIO.write dynamically fanout to different kafka topics
Hi Eleanore, Yes, to define output topic dynamically for every record, you may want to use KafkaIO.writeRecords() that takes PCollection as an input and for every processed ProducerRecord it takes its topic name (if it was specified there), and use it as an output topic. So, in this way, you can set for every record a topic name where it will be published. If I got your question right, you need to have an intermediate PTransfrom before KafkaIO.writeRecords(), that will use an information from a message field to define a topic where your record should be published, and then create a new ProducerRecord with a proper topic name. > On 14 May 2020, at 07:09, Eleanore Jin wrote: > > Hi all, > > I have a beam pipeline, which will read from kafka topic via KafkaIO, and > based on the message field, add additional field in the message for the > destination topic. > > I see KakfaIO.write can be used to publish to kafka topics. > > In KafkaIO.java, it construct the ProducerRecord, and getTopic() determines > which topic to publish, and this information is passed when create > PTransforms via KafkaIO.write. > > Any suggestions to dynamically set kafka topic from message field? > > Thanks a lot! > Eleanore
KafkaIO.write dynamically fanout to different kafka topics
Hi all, I have a beam pipeline, which will read from kafka topic via KafkaIO, and based on the message field, add additional field in the message for the destination topic. I see KakfaIO.write can be used to publish to kafka topics. In KafkaIO.java, it construct the ProducerRecord, and getTopic() determines which topic to publish, and this information is passed when create PTransforms via KafkaIO.write. Any suggestions to dynamically set kafka topic from message field? Thanks a lot! Eleanore