Hi all,

Recently, I encountered a bit of functionality for a pipeline that I was
working that seemed to be slightly lacking (specifically the recognition of
explicitly defined partitioning in the KafkaIO.WriteRecords transform) so I
put together a JIRA related to it [1] as well as a more detailed pull
request [1] with an initial potential fix/change.

I'll provide a bit more context from the pull request description below in
case in-thread feedback would be easier for some, but any
recommendations/reviewers/advice would be greatly appreciated!

Cheers,

Rion

[1]: https://issues.apache.org/jira/browse/BEAM-11806
[2]: https://github.com/apache/beam/pull/13975

----------------------------------

At present, the WriteRecords transform for KafkaIO does not recognize the
partition property defined on ProducerRecord instances consumed by the
transform:

producer.send(
        // The null property in the following constructor represents partition
        new ProducerRecord<>(
            topicName, null, timestampMillis, record.key(),
record.value(), record.headers()),
        new SendCallback());

Because of this limitation, in a scenario where a user may desire an
explicitly defined partitioning strategy as opposed to round-robin, they
would have to create their own custom DoFn that defines a KafkaProducer
(preferably within a @StartBundle) similar to the following approach (in
Kotlin):

private class ExampleProducerDoFn(...): DoFn<...>() {
        private lateinit var producer: KafkaProducer<...>

        @StartBundle
        fun startBundle(context: StartBundleContext) {
            val options =
context.pipelineOptions.`as`(YourPipelineOptions::class.java)
            producer = getKafkaProducer(options)
        }

        @ProcessElement
        fun processElement(context: ProcessContext){
            // Omitted for brevity

            // Produce the record to a specific topic at a specific partition
            producer.send(ProducerRecord(
                "your_topic_here",
                your_partition_here,
                context.element().kv.key,
                context.element().kv.value
            ))
        }

The *initial* pull request that I threw in here simply replaces the
existing null with the record.partition() (i.e. the record that was
explicitly defined initially, but it may require some other changes which
I'd need someone more familiar with the KafkaIO source to chime in on.

Reply via email to