nsivabalan commented on a change in pull request #3740: URL: https://github.com/apache/hudi/pull/3740#discussion_r719809110
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java ########## @@ -44,38 +51,44 @@ * Simplified Logic: * For every existing record * Write the record as is - * For all incoming records, write to file as is. + * For all incoming records, write to file as is, without de-duplicating based on the record key. * * Illustration with simple data. * Incoming data: - * rec1_2, rec4_2, rec5_1, rec6_1 + * rec1_2, rec1_3, rec4_2, rec5_1, rec6_1 * Existing data: * rec1_1, rec2_1, rec3_1, rec4_1 * * For every existing record, write to storage as is. * => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage * Write all records from incoming set to storage - * => rec1_2, rec4_2, rec5_1 and rec6_1 + * => rec1_2, rec1_3, rec4_2, rec5_1 and rec6_1 * * Final snapshot in storage - * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1 + * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec1_3, rec4_2, rec5_1, rec6_1 * * Users should ensure there are no duplicates when "insert" operation is used and if the respective config is enabled. So, above scenario should not * happen and every batch should have new records to be inserted. Above example is for illustration purposes only. */ public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> { private static final Logger LOG = LogManager.getLogger(HoodieConcatHandle.class); + // a representation of incoming records that tolerates duplicate keys. + private final Iterator<HoodieRecord<T>> recordItr; - public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator recordItr, - String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) { - super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, + Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt) { + super(config, instantTime, hoodieTable, Collections.emptyIterator(), partitionPath, fileId, taskContextSupplier, keyGeneratorOpt); + this.recordItr = recordItr; } public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String fileId, HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + // if incoming data is a Map, fallback to Map representation of incoming records super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, Review comment: shouldn't the 4th arg be empty list iterator? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org