prathit06 commented on code in PR #9403: URL: https://github.com/apache/hudi/pull/9403#discussion_r1290333745
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java: ########## @@ -54,21 +56,23 @@ public static boolean shouldAddOffsets(TypedProperties props) { public static final String KAFKA_SOURCE_OFFSET_COLUMN = "_hoodie_kafka_source_offset"; public static final String KAFKA_SOURCE_PARTITION_COLUMN = "_hoodie_kafka_source_partition"; public static final String KAFKA_SOURCE_TIMESTAMP_COLUMN = "_hoodie_kafka_source_timestamp"; + public static final String KAFKA_SOURCE_KEY_COLUMN = "_hoodie_kafka_source_key"; public KafkaOffsetPostProcessor(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); } @Override public Schema processSchema(Schema schema) { - // this method adds kafka offset fields namely source offset, partition and timestamp to the schema of the batch. + // this method adds kafka offset fields namely source offset, partition, timestamp and kafka message key to the schema of the batch. try { List<Schema.Field> fieldList = schema.getFields(); List<Schema.Field> newFieldList = fieldList.stream() .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())).collect(Collectors.toList()); newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, Schema.create(Schema.Type.LONG), "offset column", 0)); newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, Schema.create(Schema.Type.INT), "partition column", 0)); newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, Schema.create(Schema.Type.LONG), "timestamp column", 0)); + newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, createNullableSchema(Schema.Type.STRING), "kafka key column", JsonProperties.NULL_VALUE)); Schema newSchema = Schema.createRecord(schema.getName() + "_processed", schema.getDoc(), schema.getNamespace(), false, newFieldList); Review Comment: The key will always be `string` type , please refer : https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java#L61 `"key.deserializer", StringDeserializer.class.getName()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org