This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 49072d1e2e7 [HUDI-7508] Avoid collecting records in 
HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions 
(#10872)
49072d1e2e7 is described below

commit 49072d1e2e721f27623dba840ad6ea41a252fd15
Author: Vinish Reddy <vinishreddygunne...@gmail.com>
AuthorDate: Sat May 11 08:50:59 2024 +0530

    [HUDI-7508] Avoid collecting records in 
HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions 
(#10872)
    
    Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com>
---
 .../hudi/utilities/sources/JsonKafkaSource.java      | 18 ++++++++----------
 .../hudi/utilities/streamer/HoodieStreamerUtils.java | 20 ++++++++------------
 2 files changed, 16 insertions(+), 22 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 71f0c4db3f1..a8f70e7c854 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -21,6 +21,8 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
@@ -43,8 +45,6 @@ import org.apache.spark.streaming.kafka010.LocationStrategies;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 
 import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
 
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
@@ -80,28 +80,26 @@ public class JsonKafkaSource extends 
KafkaSource<JavaRDD<String>> {
     return postProcess(maybeAppendKafkaOffsets(kafkaRDD));
   }
 
-  protected  JavaRDD<String> 
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
+  protected JavaRDD<String> 
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
     if (this.shouldAddOffsets) {
       return kafkaRDD.mapPartitions(partitionIterator -> {
-        List<String> stringList = new LinkedList<>();
-        ObjectMapper om = new ObjectMapper();
-        partitionIterator.forEachRemaining(consumerRecord -> {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return new 
CloseableMappingIterator<>(ClosableIterator.wrap(partitionIterator), 
consumerRecord -> {
           String recordValue = consumerRecord.value().toString();
           String recordKey = StringUtils.objToString(consumerRecord.key());
           try {
-            ObjectNode jsonNode = (ObjectNode) om.readTree(recordValue);
+            ObjectNode jsonNode = (ObjectNode) 
objectMapper.readTree(recordValue);
             jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
             jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN, 
consumerRecord.partition());
             jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
consumerRecord.timestamp());
             if (recordKey != null) {
               jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
             }
-            stringList.add(om.writeValueAsString(jsonNode));
+            return objectMapper.writeValueAsString(jsonNode);
           } catch (Throwable e) {
-            stringList.add(recordValue);
+            return recordValue;
           }
         });
-        return stringList.iterator();
       });
     }
     return kafkaRDD.map(consumerRecord -> (String) consumerRecord.value());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 2ecf0b02fb6..3be64fefbb3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -55,10 +56,8 @@ import org.apache.spark.sql.avro.HoodieAvroDeserializer;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -81,6 +80,8 @@ public class HoodieStreamerUtils {
                                                                   String 
instantTime, Option<BaseErrorTableWriter> errorTableWriter) {
     boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
     boolean shouldErrorTable = errorTableWriter.isPresent() && 
props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(), 
ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue());
+    boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys(
+        props, 
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
     Set<String> partitionColumns = getPartitionColumns(props);
     return avroRDDOptional.map(avroRDD -> {
       SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
@@ -94,23 +95,18 @@ public class HoodieStreamerUtils {
                 
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
               }
               BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-              List<Either<HoodieRecord,String>> avroRecords = new 
ArrayList<>();
-              while (genericRecordIterator.hasNext()) {
-                GenericRecord genRec = genericRecordIterator.next();
+              return new 
CloseableMappingIterator<>(ClosableIterator.wrap(genericRecordIterator), genRec 
-> {
                 try {
                   HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
                   GenericRecord gr = isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
                   HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-                      (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
-                          
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                          
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+                      (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, useConsistentLogicalTimestamp))
                       : DataSourceUtils.createPayload(cfg.payloadClassName, 
gr);
-                  avroRecords.add(Either.left(new 
HoodieAvroRecord<>(hoodieKey, payload)));
+                  return Either.left(new HoodieAvroRecord<>(hoodieKey, 
payload));
                 } catch (Exception e) {
-                  avroRecords.add(generateErrorRecordOrThrowException(genRec, 
e, shouldErrorTable));
+                  return generateErrorRecordOrThrowException(genRec, e, 
shouldErrorTable);
                 }
-              }
-              return avroRecords.iterator();
+              });
             });
 
       } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {

Reply via email to