This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a8feee9  Performing commit archiving in batches to avoid keeping a 
huge chunk in memory
a8feee9 is described below

commit a8feee929394194922405bd12b330e40e9b710fe
Author: Nishith Agarwal <nagar...@uber.com>
AuthorDate: Sun Apr 7 11:12:22 2019 -0700

    Performing commit archiving in batches to avoid keeping a huge chunk in 
memory
---
 .../com/uber/hoodie/config/HoodieCompactionConfig.java |  9 +++++++++
 .../java/com/uber/hoodie/config/HoodieWriteConfig.java |  5 +++++
 .../com/uber/hoodie/io/HoodieCommitArchiveLog.java     | 18 ++++++++++++++----
 3 files changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java
index 95e0c9b..dfd69c5 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java
@@ -45,6 +45,7 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
   public static final String CLEANER_COMMITS_RETAINED_PROP = 
"hoodie.cleaner.commits.retained";
   public static final String MAX_COMMITS_TO_KEEP_PROP = 
"hoodie.keep.max.commits";
   public static final String MIN_COMMITS_TO_KEEP_PROP = 
"hoodie.keep.min.commits";
+  public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = 
"hoodie.commits.archival.batch";
   // Upsert uses this file size to compact new data onto existing files..
   public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = 
"hoodie.parquet.small.file.limit";
   // By default, treat any file <= 100MB as a small file.
@@ -104,6 +105,7 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
   private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
   private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
   private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
+  private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = 
String.valueOf(10);
   public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = 
"hoodie.compaction.daybased.target"
       + ".partitions";
   // 500GB of target IO per compaction (both read and write)
@@ -240,6 +242,11 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withCommitsArchivalBatchSize(int batchSize) {
+      props.setProperty(COMMITS_ARCHIVAL_BATCH_SIZE_PROP, 
String.valueOf(batchSize));
+      return this;
+    }
+
     public HoodieCompactionConfig build() {
       HoodieCompactionConfig config = new HoodieCompactionConfig(props);
       setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), 
AUTO_CLEAN_PROP,
@@ -281,6 +288,8 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
           COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, 
DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED);
       setDefaultOnCondition(props, 
!props.containsKey(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP),
           TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, 
DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
+      setDefaultOnCondition(props, 
!props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP),
+          COMMITS_ARCHIVAL_BATCH_SIZE_PROP, 
DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE);
 
       HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));
 
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java 
b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index 7156623..115dd51 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -249,6 +249,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
         
.parseInt(props.getProperty(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP));
   }
 
+  public int getCommitArchivalBatchSize() {
+    return Integer
+        
.parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP));
+  }
+
   /**
    * index properties
    **/
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java
index eb836d0..ccf303a 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java
@@ -245,11 +245,11 @@ public class HoodieCommitArchiveLog {
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
+        if (records.size() >= this.config.getCommitArchivalBatchSize()) {
+          writeToFile(wrapperSchema, records);
+        }
       }
-      Map<HoodieLogBlock.HeaderMetadataType, String> header = 
Maps.newHashMap();
-      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
wrapperSchema.toString());
-      HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
-      this.writer = writer.appendBlock(block);
+      writeToFile(wrapperSchema, records);
     } catch (Exception e) {
       throw new HoodieCommitException("Failed to archive commits", e);
     }
@@ -259,6 +259,16 @@ public class HoodieCommitArchiveLog {
     return archiveFilePath;
   }
 
+  private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) 
throws Exception {
+    if (records.size() > 0) {
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = 
Maps.newHashMap();
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
wrapperSchema.toString());
+      HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
+      this.writer = writer.appendBlock(block);
+      records.clear();
+    }
+  }
+
   private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline,
       HoodieInstant hoodieInstant) throws IOException {
     HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();

Reply via email to