nsivabalan commented on a change in pull request #3740:
URL: https://github.com/apache/hudi/pull/3740#discussion_r719809110



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieConcatHandle.java
##########
@@ -44,38 +51,44 @@
  * Simplified Logic:
  * For every existing record
  *     Write the record as is
- * For all incoming records, write to file as is.
+ * For all incoming records, write to file as is, without de-duplicating based 
on the record key.
  *
  * Illustration with simple data.
  * Incoming data:
- *     rec1_2, rec4_2, rec5_1, rec6_1
+ *     rec1_2, rec1_3, rec4_2, rec5_1, rec6_1
  * Existing data:
  *     rec1_1, rec2_1, rec3_1, rec4_1
  *
  * For every existing record, write to storage as is.
  *    => rec1_1, rec2_1, rec3_1 and rec4_1 is written to storage
  * Write all records from incoming set to storage
- *    => rec1_2, rec4_2, rec5_1 and rec6_1
+ *    => rec1_2, rec1_3, rec4_2, rec5_1 and rec6_1
  *
  * Final snapshot in storage
- * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec4_2, rec5_1, rec6_1
+ * rec1_1, rec2_1, rec3_1, rec4_1, rec1_2, rec1_3, rec4_2, rec5_1, rec6_1
  *
  * Users should ensure there are no duplicates when "insert" operation is used 
and if the respective config is enabled. So, above scenario should not
  * happen and every batch should have new records to be inserted. Above 
example is for illustration purposes only.
  */
 public class HoodieConcatHandle<T extends HoodieRecordPayload, I, K, O> 
extends HoodieMergeHandle<T, I, K, O> {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieConcatHandle.class);
+  // a representation of incoming records that tolerates duplicate keys.
+  private final Iterator<HoodieRecord<T>> recordItr;
 
-  public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable, Iterator recordItr,
-                            String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
-    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
taskContextSupplier, keyGeneratorOpt);
+  public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
+                            Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId,
+                            TaskContextSupplier taskContextSupplier, 
Option<BaseKeyGenerator> keyGeneratorOpt) {
+    super(config, instantTime, hoodieTable, Collections.emptyIterator(), 
partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
+    this.recordItr = recordItr;
   }
 
   public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable, Map keyToNewRecords, String partitionPath, String 
fileId,
       HoodieBaseFile dataFileToBeMerged, TaskContextSupplier 
taskContextSupplier) {
+    // if incoming data is a Map, fallback to Map representation of incoming 
records
     super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, 
fileId, dataFileToBeMerged, taskContextSupplier,

Review comment:
       shouldn't the 4th arg be empty list iterator?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to