hussein-awala commented on code in PR #9403: URL: https://github.com/apache/hudi/pull/9403#discussion_r1294069377
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java: ########## @@ -175,9 +176,11 @@ public GenericRecord withKafkaFieldsAppended(ConsumerRecord consumerRecord) { for (Schema.Field field : record.getSchema().getFields()) { recordBuilder.set(field, record.get(field.name())); } + recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition()); recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp()); + recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, String.valueOf(consumerRecord.key())); Review Comment: ```suggestion recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, consumerRecord.key().toString()); ``` ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java: ########## @@ -80,11 +81,13 @@ protected JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object ObjectMapper om = new ObjectMapper(); partitionIterator.forEachRemaining(consumerRecord -> { String record = consumerRecord.value().toString(); Review Comment: I think renaming this variable to `recordValue` might make the code more readable: ```suggestion String recordValue = consumerRecord.value().toString(); ``` ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java: ########## @@ -80,11 +81,13 @@ protected JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object ObjectMapper om = new ObjectMapper(); partitionIterator.forEachRemaining(consumerRecord -> { String record = consumerRecord.value().toString(); + String recordKey = (String) consumerRecord.key(); Review Comment: ```suggestion String recordKey = consumerRecord.key().toString(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org