This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2127d3d2c4a6898fbbf7acdd91f38769bd059e1e Author: Prathit malik <53890994+prathi...@users.noreply.github.com> AuthorDate: Tue Aug 22 06:31:47 2023 +0530 [HUDI-6683][FOLLOW-UP] Json & Avro Kafka Source Minor Refactor & Added null Kafka Key test cases (#9459) --- .../hudi/utilities/sources/JsonKafkaSource.java | 2 +- .../utilities/sources/helpers/AvroConvertor.java | 11 ++++---- .../utilities/sources/TestAvroKafkaSource.java | 30 ++++++++++++++++++++++ .../utilities/sources/TestJsonKafkaSource.java | 14 ++++++++++ .../utilities/testutils/UtilitiesTestBase.java | 9 +++++++ 5 files changed, 60 insertions(+), 6 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index f31c9b7e542..eb67abfee3a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -81,7 +81,7 @@ public class JsonKafkaSource extends KafkaSource<String> { ObjectMapper om = new ObjectMapper(); partitionIterator.forEachRemaining(consumerRecord -> { String recordValue = consumerRecord.value().toString(); - String recordKey = consumerRecord.key().toString(); + String recordKey = StringUtils.objToString(consumerRecord.key()); try { ObjectNode jsonNode = (ObjectNode) om.readTree(recordValue); jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java index 89191cb465c..f9c35bd3b6e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.avro.MercifulJsonConverter; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.internal.schema.HoodieSchemaException; import com.google.protobuf.Message; @@ -171,16 +172,16 @@ public class AvroConvertor implements Serializable { */ public GenericRecord withKafkaFieldsAppended(ConsumerRecord consumerRecord) { initSchema(); - GenericRecord record = (GenericRecord) consumerRecord.value(); + GenericRecord recordValue = (GenericRecord) consumerRecord.value(); GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema); - for (Schema.Field field : record.getSchema().getFields()) { - recordBuilder.set(field, record.get(field.name())); + for (Schema.Field field : recordValue.getSchema().getFields()) { + recordBuilder.set(field, recordValue.get(field.name())); } - + String recordKey = StringUtils.objToString(consumerRecord.key()); recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition()); recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp()); - recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, consumerRecord.key().toString()); + recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, recordKey); return recordBuilder.build(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java index 2632f72659b..16ec4545665 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java @@ -62,6 +62,7 @@ import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.Mockito.mock; public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { @@ -113,6 +114,17 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { } } + void sendMessagesToKafkaWithNullKafkaKey(String topic, int count, int numPartitions) { + List<GenericRecord> genericRecords = dataGen.generateGenericRecords(count); + Properties config = getProducerProperties(); + try (Producer<String, byte[]> producer = new KafkaProducer<>(config)) { + for (int i = 0; i < genericRecords.size(); i++) { + // null kafka key + producer.send(new ProducerRecord<>(topic, i % numPartitions, null, HoodieAvroUtils.avroToBytes(genericRecords.get(i)))); + } + } + } + private Properties getProducerProperties() { Properties props = new Properties(); props.put("bootstrap.servers", testUtils.brokerAddress()); @@ -147,6 +159,15 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null); GenericRecord withKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0); assertEquals(4,withKafkaOffsets.getSchema().getFields().size() - withoutKafkaOffsets.getSchema().getFields().size()); + assertEquals("test",withKafkaOffsets.get("_hoodie_kafka_source_key").toString()); + + // scenario with null kafka key + ConsumerRecord<Object, Object> recordConsumerRecordNullKafkaKey = new ConsumerRecord<Object,Object>("test", 0, 1L, + null, dataGen.generateGenericRecord()); + JavaRDD<ConsumerRecord<Object, Object>> rddNullKafkaKey = jsc().parallelize(Arrays.asList(recordConsumerRecordNullKafkaKey)); + avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null); + GenericRecord withKafkaOffsetsAndNullKafkaKey = avroKafkaSource.maybeAppendKafkaOffsets(rddNullKafkaKey).collect().get(0); + assertNull(withKafkaOffsetsAndNullKafkaKey.get("_hoodie_kafka_source_key")); } @Test @@ -185,5 +206,14 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness { assertEquals(4, withKafkaOffsetColumns.size() - columns.size()); List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN); assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, withKafkaOffsetColumns.size())); + + // scenario with null kafka key + sendMessagesToKafkaWithNullKafkaKey(topic, numMessages, numPartitions); + AvroKafkaSource avroKafkaSourceWithNullKafkaKey = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + SourceFormatAdapter kafkaSourceWithNullKafkaKey = new SourceFormatAdapter(avroKafkaSourceWithNullKafkaKey); + Dataset<Row> nullKafkaKeyDataset = kafkaSourceWithNullKafkaKey.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE) + .getBatch().get(); + assertEquals(numMessages, nullKafkaKeyDataset.toDF().filter("_hoodie_kafka_source_key is null").count()); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 5b0e7667fc0..60887613d64 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -66,6 +66,7 @@ import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SO import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitions; +import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecordsByPartitionsWithNullKafkaKey; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -206,6 +207,11 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { testUtils.sendMessages(topic, jsonifyRecordsByPartitions(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions)); } + void sendNullKafkaKeyMessagesToKafka(String topic, int count, int numPartitions) { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + testUtils.sendMessages(topic, jsonifyRecordsByPartitionsWithNullKafkaKey(dataGenerator.generateInsertsAsPerSchema("000", count, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA), numPartitions)); + } + void sendJsonSafeMessagesToKafka(String topic, int count, int numPartitions) { try { Tuple2<String, String>[] keyValues = new Tuple2[count]; @@ -339,7 +345,15 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, KAFKA_SOURCE_KEY_COLUMN); assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 4, withKafkaOffsetColumns.size())); + // scenario with null kafka key + sendNullKafkaKeyMessagesToKafka(topic, numMessages, numPartitions); + jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + kafkaSource = new SourceFormatAdapter(jsonSource); + Dataset<Row> dfWithOffsetInfoAndNullKafkaKey = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get().cache(); + assertEquals(numMessages, dfWithOffsetInfoAndNullKafkaKey.toDF().filter("_hoodie_kafka_source_key is null").count()); + dfNoOffsetInfo.unpersist(); dfWithOffsetInfo.unpersist(); + dfWithOffsetInfoAndNullKafkaKey.unpersist(); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index b9555cb29c2..058ed72a3be 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -447,6 +447,15 @@ public class UtilitiesTestBase { return data; } + public static Tuple2<String, String>[] jsonifyRecordsByPartitionsWithNullKafkaKey(List<HoodieRecord> records, int partitions) { + Tuple2<String, String>[] data = new Tuple2[records.size()]; + for (int i = 0; i < records.size(); i++) { + String value = Helpers.toJsonString(records.get(i)); + data[i] = new Tuple2<>(null, value); + } + return data; + } + private static void addAvroRecord( VectorizedRowBatch batch, GenericRecord record,