This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5b32f  [HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient 
to break the inheritance chain (#1372)
ee5b32f is described below

commit ee5b32f5d4aa26e7fc58ccdae46935f063460920
Author: vinoyang <yanghua1...@gmail.com>
AuthorDate: Sat Mar 7 01:59:35 2020 +0800

    [HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient to break the 
inheritance chain (#1372)
    
    
    * Removed timeline server support
    * Removed try-with-resource
---
 .../org/apache/hudi/client/HoodieReadClient.java   |  9 ++-
 .../apache/hudi/client/TestHoodieReadClient.java   | 63 ++++++++---------
 .../apache/hudi/table/TestMergeOnReadTable.java    | 82 +++++++++++-----------
 .../hudi/table/compact/TestAsyncCompaction.java    | 25 +++----
 .../main/java/org/apache/hudi/DataSourceUtils.java |  3 +-
 5 files changed, 88 insertions(+), 94 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index e08ec34..33d661b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -46,6 +45,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.types.StructType;
 
+import java.io.Serializable;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -56,7 +56,7 @@ import scala.Tuple2;
 /**
  * Provides an RDD based API for accessing/filtering Hoodie tables, based on 
keys.
  */
-public class HoodieReadClient<T extends HoodieRecordPayload> extends 
AbstractHoodieClient {
+public class HoodieReadClient<T extends HoodieRecordPayload> implements 
Serializable {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieReadClient.class);
 
@@ -65,9 +65,9 @@ public class HoodieReadClient<T extends HoodieRecordPayload> 
extends AbstractHoo
    * basepath pointing to the table. Until, then just always assume a 
BloomIndex
    */
   private final transient HoodieIndex<T> index;
-  private final HoodieTimeline commitTimeline;
   private HoodieTable hoodieTable;
   private transient Option<SQLContext> sqlContextOpt;
+  private final transient JavaSparkContext jsc;
 
   /**
    * @param basePath path to Hoodie table
@@ -108,12 +108,11 @@ public class HoodieReadClient<T extends 
HoodieRecordPayload> extends AbstractHoo
    */
   public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
       Option<EmbeddedTimelineService> timelineService) {
-    super(jsc, clientConfig, timelineService);
+    this.jsc = jsc;
     final String basePath = clientConfig.getBasePath();
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
     this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, 
jsc);
-    this.commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
     this.index = HoodieIndex.createIndex(clientConfig, jsc);
     this.sqlContextOpt = Option.empty();
   }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java 
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
index c57da14..6329e08 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
@@ -96,8 +96,8 @@ public class TestHoodieReadClient extends 
TestHoodieClientBase {
    */
   private void testReadFilterExist(HoodieWriteConfig config,
       Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
-    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);
-        HoodieReadClient readClient = 
getHoodieReadClient(config.getBasePath());) {
+    try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
+      HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
       String newCommitTime = writeClient.startCommit();
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
       JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -113,37 +113,36 @@ public class TestHoodieReadClient extends 
TestHoodieClientBase {
       // Verify there are no errors
       assertNoWriteErrors(statuses);
 
-      try (HoodieReadClient anotherReadClient = 
getHoodieReadClient(config.getBasePath());) {
-        filteredRDD = anotherReadClient.filterExists(recordsRDD);
-        List<HoodieRecord> result = filteredRDD.collect();
-        // Check results
-        assertEquals(25, result.size());
-
-        // check path exists for written keys
-        JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
-                anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
-        JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> 
keyPath._2.isPresent())
-                .map(keyPath -> keyPath._1);
-        assertEquals(75, keysWithPaths.count());
-
-        // verify rows match inserted records
-        Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
-        assertEquals(75, rows.count());
-
-        JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> 
!keyPath._2.isPresent())
-                .map(keyPath -> keyPath._1);
-
-        try {
-          anotherReadClient.readROView(keysWithoutPaths, 1);
-        } catch (Exception e) {
-          // data frame reader throws exception for empty records. ignore the 
error.
-          assertEquals(e.getClass(), AnalysisException.class);
-        }
-
-        // Actual tests of getPendingCompactions method are in 
TestAsyncCompaction
-        // This is just testing empty list
-        assertEquals(0, anotherReadClient.getPendingCompactions().size());
+      HoodieReadClient anotherReadClient = 
getHoodieReadClient(config.getBasePath());
+      filteredRDD = anotherReadClient.filterExists(recordsRDD);
+      List<HoodieRecord> result = filteredRDD.collect();
+      // Check results
+      assertEquals(25, result.size());
+
+      // check path exists for written keys
+      JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
+              anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
+      JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> 
keyPath._2.isPresent())
+              .map(keyPath -> keyPath._1);
+      assertEquals(75, keysWithPaths.count());
+
+      // verify rows match inserted records
+      Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
+      assertEquals(75, rows.count());
+
+      JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> 
!keyPath._2.isPresent())
+              .map(keyPath -> keyPath._1);
+
+      try {
+        anotherReadClient.readROView(keysWithoutPaths, 1);
+      } catch (Exception e) {
+        // data frame reader throws exception for empty records. ignore the 
error.
+        assertEquals(e.getClass(), AnalysisException.class);
       }
+
+      // Actual tests of getPendingCompactions method are in 
TestAsyncCompaction
+      // This is just testing empty list
+      assertEquals(0, anotherReadClient.getPendingCompactions().size());
     }
   }
 
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index ab27920..740caf2 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -759,54 +759,54 @@ public class TestMergeOnReadTable extends 
HoodieClientTestHarness {
 
       List<HoodieRecord> updatedRecords = 
dataGen.generateUpdates(newCommitTime, records);
       JavaRDD<HoodieRecord> updatedRecordsRDD = 
jsc.parallelize(updatedRecords, 1);
-      try (HoodieReadClient readClient = new HoodieReadClient(jsc, config);) {
-        updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
 
-        // Write them to corresponding avro logfiles
-        HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), 
metaClient.getBasePath(),
-            HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, 
updatedRecords);
+      HoodieReadClient readClient = new HoodieReadClient(jsc, config);
+      updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
 
-        // Verify that all data file has one log file
-        metaClient = HoodieTableMetaClient.reload(metaClient);
-        HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, 
jsc);
-        // In writeRecordsToLogFiles, no commit files are getting added, so 
resetting file-system view state
-        ((SyncableFileSystemView) (table.getSliceView())).reset();
-
-        for (String partitionPath : dataGen.getPartitionPaths()) {
-          List<FileSlice> groupedLogFiles =
-              
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
-          for (FileSlice fileSlice : groupedLogFiles) {
-            assertEquals("There should be 1 log file written for every data 
file", 1, fileSlice.getLogFiles().count());
-          }
+      // Write them to corresponding avro logfiles
+      HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), 
metaClient.getBasePath(),
+          HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, 
updatedRecords);
+
+      // Verify that all data file has one log file
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      // In writeRecordsToLogFiles, no commit files are getting added, so 
resetting file-system view state
+      ((SyncableFileSystemView) (table.getSliceView())).reset();
+
+      for (String partitionPath : dataGen.getPartitionPaths()) {
+        List<FileSlice> groupedLogFiles =
+            
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+        for (FileSlice fileSlice : groupedLogFiles) {
+          assertEquals("There should be 1 log file written for every data 
file", 1, fileSlice.getLogFiles().count());
         }
+      }
 
-        // Mark 2nd delta-instant as completed
-        metaClient.getActiveTimeline().createNewInstant(new 
HoodieInstant(State.INFLIGHT,
-            HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
-        metaClient.getActiveTimeline().saveAsComplete(
-            new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
+      // Mark 2nd delta-instant as completed
+      metaClient.getActiveTimeline().createNewInstant(new 
HoodieInstant(State.INFLIGHT,
+          HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
+      metaClient.getActiveTimeline().saveAsComplete(
+          new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
 
-        // Do a compaction
-        String compactionInstantTime = 
writeClient.scheduleCompaction(Option.empty()).get().toString();
-        JavaRDD<WriteStatus> result = 
writeClient.compact(compactionInstantTime);
+      // Do a compaction
+      String compactionInstantTime = 
writeClient.scheduleCompaction(Option.empty()).get().toString();
+      JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
 
-        // Verify that recently written compacted data file has no log file
-        metaClient = HoodieTableMetaClient.reload(metaClient);
-        table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-        HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
-
-        assertTrue("Compaction commit should be > than last insert", 
HoodieTimeline
-            .compareTimestamps(timeline.lastInstant().get().getTimestamp(), 
newCommitTime, HoodieTimeline.GREATER));
-
-        for (String partitionPath : dataGen.getPartitionPaths()) {
-          List<FileSlice> groupedLogFiles =
-              
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
-          for (FileSlice slice : groupedLogFiles) {
-            assertEquals("After compaction there should be no log files 
visible on a full view", 0, slice.getLogFiles().count());
-          }
-          List<WriteStatus> writeStatuses = result.collect();
-          assertTrue(writeStatuses.stream().anyMatch(writeStatus -> 
writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
+      // Verify that recently written compacted data file has no log file
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+      HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+
+      assertTrue("Compaction commit should be > than last insert", 
HoodieTimeline
+          .compareTimestamps(timeline.lastInstant().get().getTimestamp(), 
newCommitTime, HoodieTimeline.GREATER));
+
+      for (String partitionPath : dataGen.getPartitionPaths()) {
+        List<FileSlice> groupedLogFiles =
+            
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+        for (FileSlice slice : groupedLogFiles) {
+          assertEquals("After compaction there should be no log files visible 
on a full view", 0, slice.getLogFiles().count());
         }
+        List<WriteStatus> writeStatuses = result.collect();
+        assertTrue(writeStatuses.stream().anyMatch(writeStatus -> 
writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
       }
     }
   }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
 
b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
index e81fa99..1a366a4 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
@@ -92,9 +92,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase 
{
   public void testRollbackForInflightCompaction() throws Exception {
     // Rollback inflight compaction
     HoodieWriteConfig cfg = getConfig(false);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = 
getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       String firstInstantTime = "001";
       String secondInstantTime = "004";
       String compactionInstantTime = "005";
@@ -155,9 +154,8 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
 
     int numRecs = 2000;
 
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = 
getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, 
numRecs);
       records = runNextDeltaCommits(client, readClient, 
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
           new ArrayList<>());
@@ -197,9 +195,8 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
   public void testInflightCompaction() throws Exception {
     // There is inflight compaction. Subsequent compaction run must work 
correctly
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = 
getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       String firstInstantTime = "001";
       String secondInstantTime = "004";
       String compactionInstantTime = "005";
@@ -351,9 +348,8 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
   public void testCompactionAfterTwoDeltaCommits() throws Exception {
     // No Delta Commits after compaction request
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = 
getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       String firstInstantTime = "001";
       String secondInstantTime = "004";
       String compactionInstantTime = "005";
@@ -373,9 +369,8 @@ public class TestAsyncCompaction extends 
TestHoodieClientBase {
   public void testInterleavedCompaction() throws Exception {
     // Case: Two delta commits before and after compaction schedule
     HoodieWriteConfig cfg = getConfig(true);
-    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
-         HoodieReadClient readClient = 
getHoodieReadClient(cfg.getBasePath());) {
-
+    try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       String firstInstantTime = "001";
       String secondInstantTime = "004";
       String compactionInstantTime = "005";
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java 
b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index a2dfe02..6a4ad03 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -223,7 +223,8 @@ public class DataSourceUtils {
   @SuppressWarnings("unchecked")
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, 
JavaRDD<HoodieRecord> incomingHoodieRecords,
                                                      HoodieWriteConfig 
writeConfig, Option<EmbeddedTimelineService> timelineService) {
-    try (HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, 
timelineService)) {
+    try {
+      HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, 
timelineService);
       return client.tagLocation(incomingHoodieRecords)
           .filter(r -> !((HoodieRecord<HoodieRecordPayload>) 
r).isCurrentLocationKnown());
     } catch (TableNotFoundException e) {

Reply via email to