hussein-awala commented on code in PR #9403:
URL: https://github.com/apache/hudi/pull/9403#discussion_r1294069377


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java:
##########
@@ -175,9 +176,11 @@ public GenericRecord 
withKafkaFieldsAppended(ConsumerRecord consumerRecord) {
     for (Schema.Field field :  record.getSchema().getFields()) {
       recordBuilder.set(field, record.get(field.name()));
     }
+    
     recordBuilder.set(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
     recordBuilder.set(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
     recordBuilder.set(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
+    recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, 
String.valueOf(consumerRecord.key()));

Review Comment:
   ```suggestion
       recordBuilder.set(KAFKA_SOURCE_KEY_COLUMN, 
consumerRecord.key().toString());
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java:
##########
@@ -80,11 +81,13 @@ protected  JavaRDD<String> 
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object
         ObjectMapper om = new ObjectMapper();
         partitionIterator.forEachRemaining(consumerRecord -> {
           String record = consumerRecord.value().toString();

Review Comment:
   I think renaming this variable to `recordValue` might make the code more 
readable:
   ```suggestion
             String recordValue = consumerRecord.value().toString();
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java:
##########
@@ -80,11 +81,13 @@ protected  JavaRDD<String> 
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object
         ObjectMapper om = new ObjectMapper();
         partitionIterator.forEachRemaining(consumerRecord -> {
           String record = consumerRecord.value().toString();
+          String recordKey = (String) consumerRecord.key();

Review Comment:
   ```suggestion
             String recordKey = consumerRecord.key().toString();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to