This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2127d3d2c4a6898fbbf7acdd91f38769bd059e1e
Author: Prathit malik <53890994+prathi...@users.noreply.github.com>
AuthorDate: Tue Aug 22 06:31:47 2023 +0530

    [HUDI-6683][FOLLOW-UP] Json & Avro Kafka Source Minor Refactor & Added null 
Kafka Key test cases (#9459)
---
 .../hudi/utilities/sources/JsonKafkaSource.java    |  2 +-
 .../utilities/sources/helpers/AvroConvertor.java   | 11 ++++----
 .../utilities/sources/TestAvroKafkaSource.java     | 30 ++++++++++++++++++++++
 .../utilities/sources/TestJsonKafkaSource.java     | 14 ++++++++++
 .../utilities/testutils/UtilitiesTestBase.java     |  9 +++++++
 5 files changed, 60 insertions(+), 6 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index f31c9b7e542..eb67abfee3a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -81,7 +81,7 @@ public class JsonKafkaSource extends KafkaSource<String> {
         ObjectMapper om = new ObjectMapper();
         partitionIterator.forEachRemaining(consumerRecord -> {
           String recordValue = consumerRecord.value().toString();
-          String recordKey = consumerRecord.key().toString();
+          String recordKey = StringUtils.objToString(consumerRecord.key());
           try {
             ObjectNode jsonNode = (ObjectNode) om.readTree(recordValue);
             jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 89191cb465c..f9c35bd3b6e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import com.google.protobuf.Message;
@@ -171,16 +172,16 @@ public class AvroConvertor implements Serializable {
    */
   public GenericRecord withKafkaFieldsAppended(ConsumerRecord consumerRecord) {
     initSchema();
-    GenericRecord record = (GenericRecord) consumerRecord.value();
+    GenericRecord recordValue = (GenericRecord) consumerRecord.value();
     GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema);
-    for (Schema.Field field :  record.getSchema().getFields()) {
-      recordBuilder.set(field, record.get(field.name()));
+    for (Schema.Field field :  recordValue.getSchema().getFields()) {
+      recordBuilder.set(field, recordValue.get(field.name()));
     }
-    
+    String recordKey = StringUtils.objToString(consumerRecord.key());
     recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
     recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
     recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
-    recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, 
consumerRecord.key().toString());
+    recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, recordKey);
     return recordBuilder.build();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 2632f72659b..16ec4545665 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -62,6 +62,7 @@ import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.Mockito.mock;
 
 public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
@@ -113,6 +114,17 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  void sendMessagesToKafkaWithNullKafkaKey(String topic, int count, int 
numPartitions) {
+    List<GenericRecord> genericRecords = dataGen.generateGenericRecords(count);
+    Properties config = getProducerProperties();
+    try (Producer<String, byte[]> producer = new KafkaProducer<>(config)) {
+      for (int i = 0; i < genericRecords.size(); i++) {
+        // null kafka key
+        producer.send(new ProducerRecord<>(topic, i % numPartitions, null, 
HoodieAvroUtils.avroToBytes(genericRecords.get(i))));
+      }
+    }
+  }
+
   private Properties getProducerProperties() {
     Properties props = new Properties();
     props.put("bootstrap.servers", testUtils.brokerAddress());
@@ -147,6 +159,15 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
     avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), 
schemaProvider, null);
     GenericRecord withKafkaOffsets = 
avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
     assertEquals(4,withKafkaOffsets.getSchema().getFields().size() - 
withoutKafkaOffsets.getSchema().getFields().size());
+    
assertEquals("test",withKafkaOffsets.get("_hoodie_kafka_source_key").toString());
+
+    // scenario with null kafka key
+    ConsumerRecord<Object, Object> recordConsumerRecordNullKafkaKey = new 
ConsumerRecord<Object,Object>("test", 0, 1L,
+            null, dataGen.generateGenericRecord());
+    JavaRDD<ConsumerRecord<Object, Object>> rddNullKafkaKey = 
jsc().parallelize(Arrays.asList(recordConsumerRecordNullKafkaKey));
+    avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), 
schemaProvider, null);
+    GenericRecord withKafkaOffsetsAndNullKafkaKey = 
avroKafkaSource.maybeAppendKafkaOffsets(rddNullKafkaKey).collect().get(0);
+    
assertNull(withKafkaOffsetsAndNullKafkaKey.get("_hoodie_kafka_source_key"));
   }
 
   @Test
@@ -185,5 +206,14 @@ public class TestAvroKafkaSource extends 
SparkClientFunctionalTestHarness {
     assertEquals(4, withKafkaOffsetColumns.size() - columns.size());
     List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, 
KAFKA_SOURCE_KEY_COLUMN);
     assertEquals(appendList, 
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, 
withKafkaOffsetColumns.size()));
+
+    // scenario with null kafka key
+    sendMessagesToKafkaWithNullKafkaKey(topic, numMessages, numPartitions);
+    AvroKafkaSource avroKafkaSourceWithNullKafkaKey = new 
AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+    SourceFormatAdapter kafkaSourceWithNullKafkaKey = new 
SourceFormatAdapter(avroKafkaSourceWithNullKafkaKey);
+    Dataset<Row> nullKafkaKeyDataset = 
kafkaSourceWithNullKafkaKey.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
+            .getBatch().get();
+    assertEquals(numMessages, 
nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count());
+
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 5b0e7667fc0..60887613d64 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -66,6 +66,7 @@ import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
 import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions;
+import static 
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -206,6 +207,11 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     testUtils.sendMessages(topic, 
jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000", 
count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions));
   }
 
+  void sendNullKafkaKeyMessagesToKafka(String topic, int count, int 
numPartitions) {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.sendMessages(topic, 
jsonifyRecordsByPartitionsWithNullKafkaKey(dataGenerator.generateInsertsAsPerSchema("000",
 count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions));
+  }
+
   void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions) 
{
     try {
       Tuple2<String, String>[] keyValues = new Tuple2[count];
@@ -339,7 +345,15 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
     List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, 
KAFKA_SOURCE_KEY_COLUMN);
     assertEquals(appendList, 
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, 
withKafkaOffsetColumns.size()));
 
+    // scenario with null kafka key
+    sendNullKafkaKeyMessagesToKafka(topic, numMessages, numPartitions);
+    jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, 
metrics);
+    kafkaSource = new SourceFormatAdapter(jsonSource);
+    Dataset<Row> dfWithOffsetInfoAndNullKafkaKey = 
kafkaSource.fetchNewDataInRowFormat(Option.empty(), 
Long.MAX_VALUE).getBatch().get().cache();
+    assertEquals(numMessages, 
dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is 
null").count());
+
     dfNoOffsetInfo.unpersist();
     dfWithOffsetInfo.unpersist();
+    dfWithOffsetInfoAndNullKafkaKey.unpersist();
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index b9555cb29c2..058ed72a3be 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -447,6 +447,15 @@ public class UtilitiesTestBase {
       return data;
     }
 
+    public static Tuple2<String, String>[] 
jsonifyRecordsByPartitionsWithNullKafkaKey(List<HoodieRecord> records, int 
partitions) {
+      Tuple2<String, String>[] data = new Tuple2[records.size()];
+      for (int i = 0; i < records.size(); i++) {
+        String value = Helpers.toJsonString(records.get(i));
+        data[i] = new Tuple2<>(null, value);
+      }
+      return data;
+    }
+
     private static void addAvroRecord(
             VectorizedRowBatch batch,
             GenericRecord record,

Reply via email to