This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bfc858a  [BEAM-11806] Explicit Partition Support for 
KafkaIO.WriteRecords (#13975)
bfc858a is described below

commit bfc858ac0805f8ec4ca89a5e97f346209c149733
Author: Rion Williams <rionmons...@gmail.com>
AuthorDate: Mon Feb 15 06:18:45 2021 -0600

    [BEAM-11806] Explicit Partition Support for KafkaIO.WriteRecords (#13975)
    
    * [BEAM-11806] Added explicit partition support for ProducerRecord instance 
with the KafkaIO.WriteRecords transform
    
    * [BEAM-11806] Added custom partitioning support/tests for KafkaIO
---
 .../org/apache/beam/sdk/io/kafka/KafkaWriter.java  |  7 ++-
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 64 ++++++++++++++++++++--
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 7d5357d..fb5f9d0 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -70,7 +70,12 @@ class KafkaWriter<K, V> extends DoFn<ProducerRecord<K, V>, 
Void> {
 
     producer.send(
         new ProducerRecord<>(
-            topicName, null, timestampMillis, record.key(), record.value(), 
record.headers()),
+            topicName,
+            record.partition(),
+            timestampMillis,
+            record.key(),
+            record.value(),
+            record.headers()),
         new SendCallback());
 
     elementsWritten.inc();
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index d17755c..efe85cb 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -1435,9 +1435,49 @@ public class KafkaIOTest {
     }
   }
 
+  @Test
+  public void testSinkProducerRecordsWithCustomPartition() throws Exception {
+    int numElements = 1000;
+
+    try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
+
+      ProducerSendCompletionThread completionThread =
+          new 
ProducerSendCompletionThread(producerWrapper.mockProducer).start();
+
+      final String defaultTopic = "test";
+      final Integer partition = 1;
+
+      p.apply(mkKafkaReadTransform(numElements, new 
ValueAsTimestampFn()).withoutMetadata())
+          .apply(ParDo.of(new KV2ProducerRecord(defaultTopic, partition)))
+          .setCoder(ProducerRecordCoder.of(VarIntCoder.of(), 
VarLongCoder.of()))
+          .apply(
+              KafkaIO.<Integer, Long>writeRecords()
+                  .withBootstrapServers("none")
+                  .withKeySerializer(IntegerSerializer.class)
+                  .withValueSerializer(LongSerializer.class)
+                  .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
+
+      p.run();
+
+      completionThread.shutdown();
+
+      // Verify that messages are written with user-defined timestamp
+      List<ProducerRecord<Integer, Long>> sent = 
producerWrapper.mockProducer.history();
+
+      for (int i = 0; i < numElements; i++) {
+        ProducerRecord<Integer, Long> record = sent.get(i);
+        assertEquals(defaultTopic, record.topic());
+        assertEquals(partition, record.partition());
+        assertEquals(i, record.key().intValue());
+        assertEquals(i, record.value().longValue());
+      }
+    }
+  }
+
   private static class KV2ProducerRecord
       extends DoFn<KV<Integer, Long>, ProducerRecord<Integer, Long>> {
     final String topic;
+    final Integer partition;
     final boolean isSingleTopic;
     final Long ts;
     final SimpleEntry<String, String> header;
@@ -1446,6 +1486,10 @@ public class KafkaIOTest {
       this(topic, true);
     }
 
+    KV2ProducerRecord(String topic, Integer partition) {
+      this(topic, true, null, null, partition);
+    }
+
     KV2ProducerRecord(String topic, Long ts) {
       this(topic, true, ts);
     }
@@ -1455,12 +1499,22 @@ public class KafkaIOTest {
     }
 
     KV2ProducerRecord(String topic, boolean isSingleTopic, Long ts) {
-      this(topic, isSingleTopic, ts, null);
+      this(topic, isSingleTopic, ts, null, null);
     }
 
     KV2ProducerRecord(
         String topic, boolean isSingleTopic, Long ts, SimpleEntry<String, 
String> header) {
+      this(topic, isSingleTopic, ts, header, null);
+    }
+
+    KV2ProducerRecord(
+        String topic,
+        boolean isSingleTopic,
+        Long ts,
+        SimpleEntry<String, String> header,
+        Integer partition) {
       this.topic = topic;
+      this.partition = partition;
       this.isSingleTopic = isSingleTopic;
       this.ts = ts;
       this.header = header;
@@ -1477,14 +1531,16 @@ public class KafkaIOTest {
                     header.getKey(), 
header.getValue().getBytes(StandardCharsets.UTF_8)));
       }
       if (isSingleTopic) {
-        ctx.output(new ProducerRecord<>(topic, null, ts, kv.getKey(), 
kv.getValue(), headers));
+        ctx.output(new ProducerRecord<>(topic, partition, ts, kv.getKey(), 
kv.getValue(), headers));
       } else {
         if (kv.getKey() % 2 == 0) {
           ctx.output(
-              new ProducerRecord<>(topic + "_2", null, ts, kv.getKey(), 
kv.getValue(), headers));
+              new ProducerRecord<>(
+                  topic + "_2", partition, ts, kv.getKey(), kv.getValue(), 
headers));
         } else {
           ctx.output(
-              new ProducerRecord<>(topic + "_1", null, ts, kv.getKey(), 
kv.getValue(), headers));
+              new ProducerRecord<>(
+                  topic + "_1", partition, ts, kv.getKey(), kv.getValue(), 
headers));
         }
       }
     }

Reply via email to