This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new bfc858a [BEAM-11806] Explicit Partition Support for KafkaIO.WriteRecords (#13975) bfc858a is described below commit bfc858ac0805f8ec4ca89a5e97f346209c149733 Author: Rion Williams <rionmons...@gmail.com> AuthorDate: Mon Feb 15 06:18:45 2021 -0600 [BEAM-11806] Explicit Partition Support for KafkaIO.WriteRecords (#13975) * [BEAM-11806] Added explicit partition support for ProducerRecord instance with the KafkaIO.WriteRecords transform * [BEAM-11806] Added custom partitioning support/tests for KafkaIO --- .../org/apache/beam/sdk/io/kafka/KafkaWriter.java | 7 ++- .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 64 ++++++++++++++++++++-- 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java index 7d5357d..fb5f9d0 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java @@ -70,7 +70,12 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, Void> { producer.send( new ProducerRecord<>( - topicName, null, timestampMillis, record.key(), record.value(), record.headers()), + topicName, + record.partition(), + timestampMillis, + record.key(), + record.value(), + record.headers()), new SendCallback()); elementsWritten.inc(); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index d17755c..efe85cb 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -1435,9 +1435,49 @@ public class KafkaIOTest { } } + @Test + public void testSinkProducerRecordsWithCustomPartition() throws Exception { + int numElements = 1000; + + try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { + + ProducerSendCompletionThread completionThread = + new ProducerSendCompletionThread(producerWrapper.mockProducer).start(); + + final String defaultTopic = "test"; + final Integer partition = 1; + + p.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) + .apply(ParDo.of(new KV2ProducerRecord(defaultTopic, partition))) + .setCoder(ProducerRecordCoder.of(VarIntCoder.of(), VarLongCoder.of())) + .apply( + KafkaIO.<Integer, Long>writeRecords() + .withBootstrapServers("none") + .withKeySerializer(IntegerSerializer.class) + .withValueSerializer(LongSerializer.class) + .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); + + p.run(); + + completionThread.shutdown(); + + // Verify that messages are written with user-defined timestamp + List<ProducerRecord<Integer, Long>> sent = producerWrapper.mockProducer.history(); + + for (int i = 0; i < numElements; i++) { + ProducerRecord<Integer, Long> record = sent.get(i); + assertEquals(defaultTopic, record.topic()); + assertEquals(partition, record.partition()); + assertEquals(i, record.key().intValue()); + assertEquals(i, record.value().longValue()); + } + } + } + private static class KV2ProducerRecord extends DoFn<KV<Integer, Long>, ProducerRecord<Integer, Long>> { final String topic; + final Integer partition; final boolean isSingleTopic; final Long ts; final SimpleEntry<String, String> header; @@ -1446,6 +1486,10 @@ public class KafkaIOTest { this(topic, true); } + KV2ProducerRecord(String topic, Integer partition) { + this(topic, true, null, null, partition); + } + KV2ProducerRecord(String topic, Long ts) { this(topic, true, ts); } @@ -1455,12 +1499,22 @@ public class KafkaIOTest { } KV2ProducerRecord(String topic, boolean isSingleTopic, Long ts) { - this(topic, isSingleTopic, ts, null); + this(topic, isSingleTopic, ts, null, null); } KV2ProducerRecord( String topic, boolean isSingleTopic, Long ts, SimpleEntry<String, String> header) { + this(topic, isSingleTopic, ts, header, null); + } + + KV2ProducerRecord( + String topic, + boolean isSingleTopic, + Long ts, + SimpleEntry<String, String> header, + Integer partition) { this.topic = topic; + this.partition = partition; this.isSingleTopic = isSingleTopic; this.ts = ts; this.header = header; @@ -1477,14 +1531,16 @@ public class KafkaIOTest { header.getKey(), header.getValue().getBytes(StandardCharsets.UTF_8))); } if (isSingleTopic) { - ctx.output(new ProducerRecord<>(topic, null, ts, kv.getKey(), kv.getValue(), headers)); + ctx.output(new ProducerRecord<>(topic, partition, ts, kv.getKey(), kv.getValue(), headers)); } else { if (kv.getKey() % 2 == 0) { ctx.output( - new ProducerRecord<>(topic + "_2", null, ts, kv.getKey(), kv.getValue(), headers)); + new ProducerRecord<>( + topic + "_2", partition, ts, kv.getKey(), kv.getValue(), headers)); } else { ctx.output( - new ProducerRecord<>(topic + "_1", null, ts, kv.getKey(), kv.getValue(), headers)); + new ProducerRecord<>( + topic + "_1", partition, ts, kv.getKey(), kv.getValue(), headers)); } } }