Hi all, Recently, I encountered a bit of functionality for a pipeline that I was working that seemed to be slightly lacking (specifically the recognition of explicitly defined partitioning in the KafkaIO.WriteRecords transform) so I put together a JIRA related to it [1] as well as a more detailed pull request [1] with an initial potential fix/change.
I'll provide a bit more context from the pull request description below in case in-thread feedback would be easier for some, but any recommendations/reviewers/advice would be greatly appreciated! Cheers, Rion [1]: https://issues.apache.org/jira/browse/BEAM-11806 [2]: https://github.com/apache/beam/pull/13975 ---------------------------------- At present, the WriteRecords transform for KafkaIO does not recognize the partition property defined on ProducerRecord instances consumed by the transform: producer.send( // The null property in the following constructor represents partition new ProducerRecord<>( topicName, null, timestampMillis, record.key(), record.value(), record.headers()), new SendCallback()); Because of this limitation, in a scenario where a user may desire an explicitly defined partitioning strategy as opposed to round-robin, they would have to create their own custom DoFn that defines a KafkaProducer (preferably within a @StartBundle) similar to the following approach (in Kotlin): private class ExampleProducerDoFn(...): DoFn<...>() { private lateinit var producer: KafkaProducer<...> @StartBundle fun startBundle(context: StartBundleContext) { val options = context.pipelineOptions.`as`(YourPipelineOptions::class.java) producer = getKafkaProducer(options) } @ProcessElement fun processElement(context: ProcessContext){ // Omitted for brevity // Produce the record to a specific topic at a specific partition producer.send(ProducerRecord( "your_topic_here", your_partition_here, context.element().kv.key, context.element().kv.value )) } The *initial* pull request that I threw in here simply replaces the existing null with the record.partition() (i.e. the record that was explicitly defined initially, but it may require some other changes which I'd need someone more familiar with the KafkaIO source to chime in on.