This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 49072d1e2e7 [HUDI-7508] Avoid collecting records in HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions (#10872) 49072d1e2e7 is described below commit 49072d1e2e721f27623dba840ad6ea41a252fd15 Author: Vinish Reddy <vinishreddygunne...@gmail.com> AuthorDate: Sat May 11 08:50:59 2024 +0530 [HUDI-7508] Avoid collecting records in HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions (#10872) Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com> --- .../hudi/utilities/sources/JsonKafkaSource.java | 18 ++++++++---------- .../hudi/utilities/streamer/HoodieStreamerUtils.java | 20 ++++++++------------ 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 71f0c4db3f1..a8f70e7c854 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -21,6 +21,8 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig; import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; @@ -43,8 +45,6 @@ import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; import java.io.IOException; -import java.util.LinkedList; -import java.util.List; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN; @@ -80,28 +80,26 @@ public class JsonKafkaSource extends KafkaSource<JavaRDD<String>> { return postProcess(maybeAppendKafkaOffsets(kafkaRDD)); } - protected JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) { + protected JavaRDD<String> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) { if (this.shouldAddOffsets) { return kafkaRDD.mapPartitions(partitionIterator -> { - List<String> stringList = new LinkedList<>(); - ObjectMapper om = new ObjectMapper(); - partitionIterator.forEachRemaining(consumerRecord -> { + ObjectMapper objectMapper = new ObjectMapper(); + return new CloseableMappingIterator<>(ClosableIterator.wrap(partitionIterator), consumerRecord -> { String recordValue = consumerRecord.value().toString(); String recordKey = StringUtils.objToString(consumerRecord.key()); try { - ObjectNode jsonNode = (ObjectNode) om.readTree(recordValue); + ObjectNode jsonNode = (ObjectNode) objectMapper.readTree(recordValue); jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset()); jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, consumerRecord.partition()); jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, consumerRecord.timestamp()); if (recordKey != null) { jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey); } - stringList.add(om.writeValueAsString(jsonNode)); + return objectMapper.writeValueAsString(jsonNode); } catch (Throwable e) { - stringList.add(recordValue); + return recordValue; } }); - return stringList.iterator(); }); } return kafkaRDD.map(consumerRecord -> (String) consumerRecord.value()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java index 2ecf0b02fb6..3be64fefbb3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Either; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; @@ -55,10 +56,8 @@ import org.apache.spark.sql.avro.HoodieAvroDeserializer; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; -import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; -import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -81,6 +80,8 @@ public class HoodieStreamerUtils { String instantTime, Option<BaseErrorTableWriter> errorTableWriter) { boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); boolean shouldErrorTable = errorTableWriter.isPresent() && props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(), ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue()); + boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys( + props, KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED); Set<String> partitionColumns = getPartitionColumns(props); return avroRDDOptional.map(avroRDD -> { SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema()); @@ -94,23 +95,18 @@ public class HoodieStreamerUtils { props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); } BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>(); - while (genericRecordIterator.hasNext()) { - GenericRecord genRec = genericRecordIterator.next(); + return new CloseableMappingIterator<>(ClosableIterator.wrap(genericRecordIterator), genRec -> { try { HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec)); GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec; HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( - KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) + (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, useConsistentLogicalTimestamp)) : DataSourceUtils.createPayload(cfg.payloadClassName, gr); - avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload))); + return Either.left(new HoodieAvroRecord<>(hoodieKey, payload)); } catch (Exception e) { - avroRecords.add(generateErrorRecordOrThrowException(genRec, e, shouldErrorTable)); + return generateErrorRecordOrThrowException(genRec, e, shouldErrorTable); } - } - return avroRecords.iterator(); + }); }); } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {