This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit dd7952ec9de7f9858ded98b49923e8978fd1df78 Author: vinoyang <yanghua1...@gmail.com> AuthorDate: Sat Mar 7 01:59:35 2020 +0800 [HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient to break the inheritance chain (#1372) * Removed timeline server support * Removed try-with-resource --- .../org/apache/hudi/client/HoodieReadClient.java | 9 ++- .../apache/hudi/client/TestHoodieReadClient.java | 63 ++++++++--------- .../apache/hudi/table/TestMergeOnReadTable.java | 82 +++++++++++----------- .../hudi/table/compact/TestAsyncCompaction.java | 25 +++---- .../main/java/org/apache/hudi/DataSourceUtils.java | 3 +- 5 files changed, 88 insertions(+), 94 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index e08ec34..33d661b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -46,6 +45,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.StructType; +import java.io.Serializable; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -56,7 +56,7 @@ import scala.Tuple2; /** * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys. */ -public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoodieClient { +public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializable { private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class); @@ -65,9 +65,9 @@ public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoo * basepath pointing to the table. Until, then just always assume a BloomIndex */ private final transient HoodieIndex<T> index; - private final HoodieTimeline commitTimeline; private HoodieTable hoodieTable; private transient Option<SQLContext> sqlContextOpt; + private final transient JavaSparkContext jsc; /** * @param basePath path to Hoodie table @@ -108,12 +108,11 @@ public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoo */ public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineService) { - super(jsc, clientConfig, timelineService); + this.jsc = jsc; final String basePath = clientConfig.getBasePath(); // Create a Hoodie table which encapsulated the commits and files visible HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc); - this.commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); this.index = HoodieIndex.createIndex(clientConfig, jsc); this.sqlContextOpt = Option.empty(); } diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java index c57da14..6329e08 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java @@ -96,8 +96,8 @@ public class TestHoodieReadClient extends TestHoodieClientBase { */ private void testReadFilterExist(HoodieWriteConfig config, Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception { - try (HoodieWriteClient writeClient = getHoodieWriteClient(config); - HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());) { + try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) { + HoodieReadClient readClient = getHoodieReadClient(config.getBasePath()); String newCommitTime = writeClient.startCommit(); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); @@ -113,37 +113,36 @@ public class TestHoodieReadClient extends TestHoodieClientBase { // Verify there are no errors assertNoWriteErrors(statuses); - try (HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());) { - filteredRDD = anotherReadClient.filterExists(recordsRDD); - List<HoodieRecord> result = filteredRDD.collect(); - // Check results - assertEquals(25, result.size()); - - // check path exists for written keys - JavaPairRDD<HoodieKey, Option<String>> keyToPathPair = - anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey())); - JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent()) - .map(keyPath -> keyPath._1); - assertEquals(75, keysWithPaths.count()); - - // verify rows match inserted records - Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1); - assertEquals(75, rows.count()); - - JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent()) - .map(keyPath -> keyPath._1); - - try { - anotherReadClient.readROView(keysWithoutPaths, 1); - } catch (Exception e) { - // data frame reader throws exception for empty records. ignore the error. - assertEquals(e.getClass(), AnalysisException.class); - } - - // Actual tests of getPendingCompactions method are in TestAsyncCompaction - // This is just testing empty list - assertEquals(0, anotherReadClient.getPendingCompactions().size()); + HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath()); + filteredRDD = anotherReadClient.filterExists(recordsRDD); + List<HoodieRecord> result = filteredRDD.collect(); + // Check results + assertEquals(25, result.size()); + + // check path exists for written keys + JavaPairRDD<HoodieKey, Option<String>> keyToPathPair = + anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey())); + JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent()) + .map(keyPath -> keyPath._1); + assertEquals(75, keysWithPaths.count()); + + // verify rows match inserted records + Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1); + assertEquals(75, rows.count()); + + JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent()) + .map(keyPath -> keyPath._1); + + try { + anotherReadClient.readROView(keysWithoutPaths, 1); + } catch (Exception e) { + // data frame reader throws exception for empty records. ignore the error. + assertEquals(e.getClass(), AnalysisException.class); } + + // Actual tests of getPendingCompactions method are in TestAsyncCompaction + // This is just testing empty list + assertEquals(0, anotherReadClient.getPendingCompactions().size()); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index ab27920..740caf2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -759,54 +759,54 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records); JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); - try (HoodieReadClient readClient = new HoodieReadClient(jsc, config);) { - updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect(); - // Write them to corresponding avro logfiles - HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(), - HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords); + HoodieReadClient readClient = new HoodieReadClient(jsc, config); + updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect(); - // Verify that all data file has one log file - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state - ((SyncableFileSystemView) (table.getSliceView())).reset(); - - for (String partitionPath : dataGen.getPartitionPaths()) { - List<FileSlice> groupedLogFiles = - table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); - for (FileSlice fileSlice : groupedLogFiles) { - assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count()); - } + // Write them to corresponding avro logfiles + HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(), + HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords); + + // Verify that all data file has one log file + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state + ((SyncableFileSystemView) (table.getSliceView())).reset(); + + for (String partitionPath : dataGen.getPartitionPaths()) { + List<FileSlice> groupedLogFiles = + table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); + for (FileSlice fileSlice : groupedLogFiles) { + assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count()); } + } - // Mark 2nd delta-instant as completed - metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, - HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime)); - metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); + // Mark 2nd delta-instant as completed + metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, + HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime)); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); - // Do a compaction - String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); - JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime); + // Do a compaction + String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); + JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime); - // Verify that recently written compacted data file has no log file - metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); - HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); - - assertTrue("Compaction commit should be > than last insert", HoodieTimeline - .compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, HoodieTimeline.GREATER)); - - for (String partitionPath : dataGen.getPartitionPaths()) { - List<FileSlice> groupedLogFiles = - table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); - for (FileSlice slice : groupedLogFiles) { - assertEquals("After compaction there should be no log files visible on a full view", 0, slice.getLogFiles().count()); - } - List<WriteStatus> writeStatuses = result.collect(); - assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath))); + // Verify that recently written compacted data file has no log file + metaClient = HoodieTableMetaClient.reload(metaClient); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + + assertTrue("Compaction commit should be > than last insert", HoodieTimeline + .compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, HoodieTimeline.GREATER)); + + for (String partitionPath : dataGen.getPartitionPaths()) { + List<FileSlice> groupedLogFiles = + table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); + for (FileSlice slice : groupedLogFiles) { + assertEquals("After compaction there should be no log files visible on a full view", 0, slice.getLogFiles().count()); } + List<WriteStatus> writeStatuses = result.collect(); + assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath))); } } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java index e81fa99..1a366a4 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java @@ -92,9 +92,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testRollbackForInflightCompaction() throws Exception { // Rollback inflight compaction HoodieWriteConfig cfg = getConfig(false); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { - + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -155,9 +154,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { int numRecs = 2000; - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { - + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs); records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, new ArrayList<>()); @@ -197,9 +195,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testInflightCompaction() throws Exception { // There is inflight compaction. Subsequent compaction run must work correctly HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { - + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -351,9 +348,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testCompactionAfterTwoDeltaCommits() throws Exception { // No Delta Commits after compaction request HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { - + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; @@ -373,9 +369,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testInterleavedCompaction() throws Exception { // Case: Two delta commits before and after compaction schedule HoodieWriteConfig cfg = getConfig(true); - try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); - HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { - + try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); String firstInstantTime = "001"; String secondInstantTime = "004"; String compactionInstantTime = "005"; diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 1158fa2..35c5955 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -185,7 +185,8 @@ public class DataSourceUtils { @SuppressWarnings("unchecked") public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) { - try (HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService)) { + try { + HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService); return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown()); } catch (TableNotFoundException e) {