bhasudha commented on a change in pull request #689: [HUDI-25] Optimize
HoodieInputFormat.listStatus for faster Hive Incremental queries
URL: https://github.com/apache/incubator-hudi/pull/689#discussion_r294632849
##########
File path:
hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
##########
@@ -218,4 +215,109 @@ protected static HoodieTableMetaClient
getTableMetaClient(FileSystem fs, Path da
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
}
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
+ * partitions and then filtering based on the commits of interest, this
logic first extracts the
+ * partitions touched by the desired commits and then lists only those
partitions.
+ */
+ private List<FileStatus> listStatusForIncrementalMode(JobConf job,
+ HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws
IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ HoodieTimeline timeline =
tableMetaClient.getActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants();
+ String lastIncrementalTs =
HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
+ // Total number of commits to return in this batch. Set this to -1 to get
all the commits.
+ Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job),
tableName);
+ LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+ List<HoodieInstant> commitsToCheck =
timeline.findInstantsAfter(lastIncrementalTs, maxCommits)
+ .getInstants().collect(Collectors.toList());
+ // Extract partitions touched by the commitsToCheck
+ Set<String> partitionsToList = new HashSet<>();
+ for (int i = 0; i < commitsToCheck.size(); i++) {
+ HoodieInstant commit = commitsToCheck.get(i);
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+ .fromBytes(timeline.getInstantDetails(commit).get(),
+ HoodieCommitMetadata.class);
+
partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet());
+ }
+ if (partitionsToList.isEmpty()) {
+ return null;
+ }
+ String incrementalInputPaths = partitionsToList.stream()
+ .map(s -> tableMetaClient.getBasePath() + Path.SEPARATOR + s)
+ .filter(s -> {
+ /*
+ * Ensure to return only results from the original input path that
has incremental changes
+ * This check is needed for the following corner case - When the
caller invokes
+ * HoodieInputFormat.listStatus multiple times (with small batches
of Hive partitions each
+ * time. Ex. Hive fetch task calls listStatus for every partition
once) we do not want to
+ * accidentally return all incremental changes for the entire table
in every listStatus()
+ * call. This will create redundant splits. Instead we only want to
return the incremental
+ * changes (if so any) in that batch of input paths.
+ *
+ * NOTE on Hive queries that are executed using Fetch task:
Review comment:
Sent a PR here for updating docs on this -
https://github.com/apache/incubator-hudi/pull/742
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services