This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b5fcd  Cache RDD to avoid recomputing data ingestion. Return result 
RDD after updating index so that this step is not skipped by chained actions on 
the same RDD
86b5fcd is described below

commit 86b5fcdd332f8d7f1c079dbc54756173ca4b7c94
Author: venkatr <venk...@uber.com>
AuthorDate: Wed Jul 24 17:55:38 2019 -0700

    Cache RDD to avoid recomputing data ingestion. Return result RDD after 
updating index so that this step is not skipped by chained actions on the same 
RDD
---
 .../java/com/uber/hoodie/HoodieWriteClient.java    |  7 ++-
 .../com/uber/hoodie/config/HoodieIndexConfig.java  | 29 ++++++++++++
 .../com/uber/hoodie/config/HoodieWriteConfig.java  |  4 ++
 .../java/com/uber/hoodie/index/HoodieIndex.java    |  4 ++
 .../com/uber/hoodie/index/hbase/HBaseIndex.java    | 31 +++++++------
 .../java/com/uber/hoodie/io/HoodieMergeHandle.java |  1 +
 .../java/com/uber/hoodie/index/TestHbaseIndex.java | 51 ++++++++++++++++++++++
 7 files changed, 113 insertions(+), 14 deletions(-)

diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index 3c8b36c..0d4b415 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -466,10 +466,12 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
   private JavaRDD<WriteStatus> 
updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD,
       HoodieTable<T> table, String commitTime) {
+    // cache writeStatusRDD before updating index, so that all actions before 
this are not triggered again for future
+    // RDD actions that are performed after updating the index.
+    writeStatusRDD = 
writeStatusRDD.persist(config.getWriteStatusStorageLevel());
     // Update the index back
     JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, 
table);
     // Trigger the insert and collect statuses
-    statuses = statuses.persist(config.getWriteStatusStorageLevel());
     commitOnAutoCommit(commitTime, statuses, 
table.getMetaClient().getCommitActionType());
     return statuses;
   }
@@ -974,6 +976,9 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   public void close() {
     // Stop timeline-server if running
     super.close();
+    // Calling this here releases any resources used by your index, so make 
sure to finish any related operations
+    // before this point
+    this.index.close();
   }
 
   /**
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java 
b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
index 073ed51..9cbc734 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
@@ -57,6 +57,15 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
   public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = 
"hoodie.bloom.index.keys.per.bucket";
   public static final String DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET = "10000000";
 
+  // ***** HBase Index Configs *****
+  public static final String HBASE_ZKQUORUM_PROP = 
"hoodie.index.hbase.zkquorum";
+  public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
+  public static final String HBASE_ZK_ZNODEPARENT = 
"hoodie.index.hbase.zknode.path";
+  public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
+  public static final String HBASE_GET_BATCH_SIZE_PROP = 
"hoodie.index.hbase.get.batch.size";
+  public static final String HBASE_PUT_BATCH_SIZE_PROP = 
"hoodie.index.hbase.put.batch.size";
+  public static final String DEFAULT_HBASE_BATCH_SIZE = "100";
+
 
   public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL =
       "hoodie.bloom.index.input.storage" + ".level";
@@ -109,6 +118,26 @@ public class HoodieIndexConfig extends DefaultHoodieConfig 
{
       return this;
     }
 
+    public Builder hbaseZkQuorum(String zkString) {
+      props.setProperty(HBASE_ZKQUORUM_PROP, zkString);
+      return this;
+    }
+
+    public Builder hbaseZkPort(int port) {
+      props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port));
+      return this;
+    }
+
+    public Builder hbaseZkZnodeParent(String zkZnodeParent) {
+      props.setProperty(HBASE_ZK_ZNODEPARENT, zkZnodeParent);
+      return this;
+    }
+
+    public Builder hbaseTableName(String tableName) {
+      props.setProperty(HBASE_TABLENAME_PROP, tableName);
+      return this;
+    }
+
     public Builder bloomIndexParallelism(int parallelism) {
       props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, 
String.valueOf(parallelism));
       return this;
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java 
b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index e78c358..63ba10e 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -306,6 +306,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return 
Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKPORT_PROP));
   }
 
+  public String getHBaseZkZnodeParent() {
+    return props.getProperty(HoodieIndexConfig.HBASE_ZK_ZNODEPARENT);
+  }
+
   public String getHbaseTableName() {
     return props.getProperty(HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP);
   }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java 
b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
index 7f4eb63..e15d07b 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
@@ -115,6 +115,10 @@ public abstract class HoodieIndex<T extends 
HoodieRecordPayload> implements Seri
    */
   public abstract boolean isImplicitWithStorage();
 
+  /**
+   * Each index type should implement it's own logic to release any resources 
acquired during the process.
+   */
+  public void close() {}
 
   public enum IndexType {
     HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java 
b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
index 4eb9443..2764721 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
@@ -83,6 +83,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
 
   private static Logger logger = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
+  private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
   private float qpsFraction;
   private int maxQpsPerRegionServer;
   /**
@@ -106,6 +107,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
     this.qpsFraction = config.getHbaseIndexQPSFraction();
     this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
     this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
+    this.hBaseIndexQPSResourceAllocator = 
createQPSResourceAllocator(this.config);
   }
 
   @VisibleForTesting
@@ -132,6 +134,10 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
     Configuration hbaseConfig = HBaseConfiguration.create();
     String quorum = config.getHbaseZkQuorum();
     hbaseConfig.set("hbase.zookeeper.quorum", quorum);
+    String zkZnodeParent = config.getHBaseZkZnodeParent();
+    if (zkZnodeParent != null) {
+      hbaseConfig.set("zookeeper.znode.parent", zkZnodeParent);
+    }
     String port = String.valueOf(config.getHbaseZkPort());
     hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
     try {
@@ -158,6 +164,13 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
     });
   }
 
+  /**
+   * Ensure that any resources used for indexing are released here.
+   */
+  public void close() {
+    this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
+  }
+
   private Get generateStatement(String key) throws IOException {
     return new Get(Bytes.toBytes(key)).setMaxVersions(1)
         .addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
@@ -368,21 +381,13 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
   public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> 
writeStatusRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) {
     final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = 
createQPSResourceAllocator(this.config);
-    JavaRDD<WriteStatus> writeStatusResultRDD;
     setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    logger.info("multiPutBatchSize: before puts" + multiPutBatchSize);
+    logger.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
     JavaRDD<WriteStatus> writeStatusJavaRDD = 
writeStatusRDD.mapPartitionsWithIndex(
         updateLocationFunction(), true);
-    // Forcing a spark action so HBase puts are triggered before releasing 
resources
-    if (this.config.getHBaseIndexShouldComputeQPSDynamically()) {
-      logger.info("writestatus count: " + writeStatusJavaRDD.count());
-      writeStatusResultRDD = writeStatusRDD;
-    } else {
-      writeStatusResultRDD = writeStatusJavaRDD;
-    }
-    // Release QPS resources as HBAse puts are done at this point
-    hBaseIndexQPSResourceAllocator.releaseQPSResources();
-    return writeStatusResultRDD;
+    // caching the index updated status RDD
+    writeStatusJavaRDD = 
writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel());
+    return writeStatusJavaRDD;
   }
 
   private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD,
@@ -430,7 +435,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
     final JavaPairRDD<Long, Integer> insertOnlyWriteStatusRDD =
         writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
             .mapToPair(w -> new Tuple2<>(w.getStat().getNumInserts(), 1));
-    return insertOnlyWriteStatusRDD.reduce((w, c) -> new Tuple2<>(w._1 + c._1, 
w._2 + c._2));
+    return insertOnlyWriteStatusRDD.fold(new Tuple2<>(0L, 0), (w, c) -> new 
Tuple2<>(w._1 + c._1, w._2 + c._2));
   }
 
   public static class HbasePutBatchSizeCalculator implements Serializable {
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
index bd6650e..8ebf31e 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
@@ -149,6 +149,7 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieWrit
    * Extract old file path, initialize StorageWriter and WriteStatus
    */
   private void init(String fileId, String partitionPath, HoodieDataFile 
dataFileToBeMerged) {
+    logger.info("partitionPath:" + partitionPath + ", fileId to be merged:" + 
fileId);
     this.writtenRecordKeys = new HashSet<>();
     writeStatus.setStat(new HoodieWriteStat());
     try {
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java 
b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
index f2c5d5c..5ff7220 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
@@ -186,6 +186,40 @@ public class TestHbaseIndex {
   }
 
   @Test
+  public void testTagLocationAndDuplicateUpdate() throws Exception {
+    String newCommitTime = "001";
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    // Load to memory
+    HoodieWriteConfig config = getConfig();
+    HBaseIndex index = new HBaseIndex(config);
+    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    writeClient.startCommit();
+    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, 
jsc);
+
+    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, 
newCommitTime);
+    JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, 
hoodieTable);
+    // Duplicate upsert and ensure correctness is maintained
+    writeClient.upsert(writeRecords, newCommitTime);
+    assertNoWriteErrors(writeStatues.collect());
+
+    // Now commit this & update location of records inserted and validate no 
errors
+    writeClient.commit(newCommitTime, writeStatues);
+    // Now tagLocation for these records, hbaseIndex should tag them correctly
+    metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
basePath);
+    hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
+    JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, 
hoodieTable);
+    assertTrue(javaRDD.filter(record -> 
record.isCurrentLocationKnown()).collect().size() == 10);
+    assertTrue(javaRDD.map(record -> 
record.getKey().getRecordKey()).distinct().count() == 10);
+    assertTrue(javaRDD.filter(
+        record -> (record.getCurrentLocation() != null && 
record.getCurrentLocation().getInstantTime()
+                                                              
.equals(newCommitTime))).distinct().count() == 10);
+  }
+
+  @Test
   public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
 
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
@@ -360,6 +394,23 @@ public class TestHbaseIndex {
   }
 
   @Test
+  public void testsHBasePutAccessParallelismWithNoInserts() {
+    HoodieWriteConfig config = getConfig();
+    HBaseIndex index = new HBaseIndex(config);
+    final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
+        Arrays.asList(
+            getSampleWriteStatus(0, 2),
+            getSampleWriteStatus(0, 1)),
+        10);
+    final Tuple2<Long, Integer> tuple = 
index.getHBasePutAccessParallelism(writeStatusRDD);
+    final int hbasePutAccessParallelism = 
Integer.parseInt(tuple._2.toString());
+    final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
+    Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
+    Assert.assertEquals(0, hbasePutAccessParallelism);
+    Assert.assertEquals(0, hbaseNumPuts);
+  }
+
+  @Test
   public void testsHBaseIndexDefaultQPSResourceAllocator() {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);

Reply via email to