Hi Alex, Thanks a lot for the information!
Best Eleanore On Thu, May 14, 2020 at 1:53 AM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Hi Eleanore, > > Yes, to define output topic dynamically for every record, you may want to > use KafkaIO.writeRecords() that takes PCollection<ProducerRecord<K, V> as > an input and for every processed ProducerRecord it takes its topic name (if > it was specified there), and use it as an output topic. So, in this way, > you can set for every record a topic name where it will be published. > > If I got your question right, you need to have an intermediate PTransfrom > before KafkaIO.writeRecords(), that will use an information from a message > field to define a topic where your record should be published, and then > create a new ProducerRecord with a proper topic name. > > > On 14 May 2020, at 07:09, Eleanore Jin <eleanore....@gmail.com> wrote: > > > > Hi all, > > > > I have a beam pipeline, which will read from kafka topic via KafkaIO, > and based on the message field, add additional field in the message for the > destination topic. > > > > I see KakfaIO.write can be used to publish to kafka topics. > > > > In KafkaIO.java, it construct the ProducerRecord, and getTopic() > determines which topic to publish, and this information is passed when > create PTransforms via KafkaIO.write. > > > > Any suggestions to dynamically set kafka topic from message field? > > > > Thanks a lot! > > Eleanore > >