hudi-bot opened a new issue, #14538: URL: https://github.com/apache/hudi/issues/14538
Main goals here is to provide a much simpler index, without advanced optimizations like auto tuned parallelism/skew handling but a better out-of-experience for small workloads. ## JIRA info - Link: https://issues.apache.org/jira/browse/HUDI-686 - Type: Task - Epic: https://issues.apache.org/jira/browse/HUDI-1822 - Affects version(s): - 0.9.0 - Attachment(s): - 19/Mar/20 17:16;vinoth;Screen Shot 2020-03-19 at 10.15.10 AM.png;https://issues.apache.org/jira/secure/attachment/12997144/Screen+Shot+2020-03-19+at+10.15.10+AM.png - 19/Mar/20 17:16;vinoth;Screen Shot 2020-03-19 at 10.15.10 AM.png;https://issues.apache.org/jira/secure/attachment/12997143/Screen+Shot+2020-03-19+at+10.15.10+AM.png - 19/Mar/20 17:15;vinoth;Screen Shot 2020-03-19 at 10.15.10 AM.png;https://issues.apache.org/jira/secure/attachment/12997142/Screen+Shot+2020-03-19+at+10.15.10+AM.png - 19/Mar/20 17:17;vinoth;image-2020-03-19-10-17-43-048.png;https://issues.apache.org/jira/secure/attachment/12997145/image-2020-03-19-10-17-43-048.png --- ## Comments 19/Mar/20 08:40;vinoth;Have an implementation here https://github.com/vinothchandar/incubator-hudi/tree/hudi-686-bloomindex-v2 which seems to work functionally (tested by making unit tests hit this ) Needs more perf testing and a productionized implementation.. Locally, I am still seeing performance which is about the same as current BloomIndex. May be running on the cluster at load, may yield different results.. cc [~shivnarayan] [~vbalaji] ;;; --- 19/Mar/20 16:57;lamber-ken;[~vinoth] thanks for bring up this new idea. here are some concerns to consider: 1. +candidates+ may cause OOM, although we can increase the num of partitions to solve it. that may will impact the user's experience, because use need think about it. {quote}List<Pair<HoodieRecord<T>, String>> candidates = new ArrayList<>(); {quote} 2. +fileIDToBloomFilter+ is an external map that spills content to disk, we need to think about the seri / dese performance {quote}this.fileIDToBloomFilter = new ExternalSpillableMap<>(1000000000L ...)BloomFilter filter = fileIDToBloomFilter.get(partitionFileIdPair.getRight()); {quote} [https://github.com/vinothchandar/incubator-hudi/blob/hudi-686-bloomindex-v2/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexV2.java] {code:java} @Override protected List<Pair<HoodieRecord<T>, String>> computeNext() { List<Pair<HoodieRecord<T>, String>> candidates = new ArrayList<>(); if (inputItr.hasNext()) { HoodieRecord<T> record = inputItr.next(); try { initIfNeeded(record.getPartitionPath()); } catch (IOException e) { throw new HoodieIOException( "Error reading index metadata for " + record.getPartitionPath(), e); } indexFileFilter .getMatchingFilesAndPartition(record.getPartitionPath(), record.getRecordKey()) .forEach(partitionFileIdPair -> { BloomFilter filter = fileIDToBloomFilter.get(partitionFileIdPair.getRight()); if (filter.mightContain(record.getRecordKey())) { candidates.add(Pair.of(record, partitionFileIdPair.getRight())); } }); if (candidates.size() == 0) { candidates.add(Pair.of(record, "")); } } return candidates; } {code} ;;; --- 19/Mar/20 17:09;vinoth;candidates can be as big as N * size of HoodieRecord, where N is the number of files in a partition? that's not as bad right;;; --- 19/Mar/20 17:18;vinoth;Running a local microbenchmark, I actually found that the extra shuffling in V2 Implementation (shuffles input two times, vs just 1 time caching and shuffling just the keys in the current impl) actually makes it a tad slower. *BloomIndexV2* *!Screen Shot 2020-03-19 at 10.15.10 AM.png!* *BloomIndex* *!image-2020-03-19-10-17-43-048.png!* ;;; --- 19/Mar/20 21:13;vinoth;Timing the individual stages Roughly, here is how it looks like .. Not sure how much more we can optimize this further, since the time spent is mostly inside parquet reading the metadata read cost i.e reading the footers dominates the first stage {code:java} System.err.format("LazyRangeBloomChecker: %d, %d, %d, %d, %d, %d \n", totalCount, totalMatches, totalTimeNs, totalMetadataReadTimeNs, totalRangeCheckTimeNs, totalBloomCheckTimeNs); LazyRangeBloomChecker: 18632, 5068, 481673381, 439685698, 5872344, 26426499 LazyRangeBloomChecker: 29312, 0, 397373925, 361189515, 12336753, 3205152 LazyRangeBloomChecker: 36422, 0, 395838972, 364965143, 6870027, 3088563 LazyRangeBloomChecker: 32698, 21252, 502987672, 374374961, 15190330, 94478078 LazyRangeBloomChecker: 36633, 0, 420441840, 388992165, 7971801, 5222196 LazyRangeBloomChecker: 35919, 35919, 547982738, 382770288, 17042127, 130529090 LazyRangeBloomChecker: 26448, 26448, 673972735, 497887634, 12918131, 150188682 LazyRangeBloomChecker: 29827, 25338, 739789660, 568953445, 14633164, 140977007 LazyRangeBloomChecker: 40694, 40694, 611867636, 364297491, 20609305, 206717514 LazyRangeBloomChecker: 41515, 41515, 754657982, 379440879, 18670251, 337857948 LazyRangeBloomChecker: 46672, 46672, 761187684, 364060859, 18887398, 359483525 LazyRangeBloomChecker: 26931, 2360, 296764733, 275044606, 3439711, 11417543 LazyRangeBloomChecker: 41863, 20714, 831527864, 656157121, 13784027, 143710665 LazyRangeBloomChecker: 36429, 0, 181597122, 157965082, 5342164, 3072219 LazyRangeBloomChecker: 45618, 0, 180005379, 154248797, 6254112, 3332647 LazyRangeBloomChecker: 60916, 60916, 730395000, 244153313, 24926738, 439724359 {code} the reading of the actual keys themselves, dominate the second.. {code:java} System.err.println("LazyKeyChecker: " + totalTimeNs + "," + totalCount + "," + totalReadTimeNs); LazyKeyChecker: 32576530,2119,30998522 LazyKeyChecker: 39189497,3415,36666074 LazyKeyChecker: 36683534,3726,33878272 LazyKeyChecker: 293554458,38523,264821882 LazyKeyChecker: 297414709,39263,268215304 LazyKeyChecker: 212946950,65474,169525572 LazyKeyChecker: 1047598045,65998,1003946915 LazyKeyChecker: 1048062757,66734,1003969635 LazyKeyChecker: 1041348181,74948,992863777 [Stage 141:================================== {code};;; --- 19/Mar/20 21:20;vinoth;[~vbalaji] [~shivnarayan] Please review this information closely.. In short, we can support an indexing option that eliminates memory caching, but not sure if that will outperform current BloomIndex. Is it worth cleaning this implementation up and checking it in? ;;; --- 21/Mar/20 15:22;vinoth;On second thoughts, I think its useful to have this for workloads where the input data is large and caching could lead ot spilling.. ;;; --- 23/Mar/20 01:30;shivnarayan;Interesting impl [~vinoth] . Some initial thoughts. * Wrt candidates, I don't think we might run into OOM as its bounded to one partition. * May I know why we need external spillableMap? why can't we use regular map. I don't know the benefits of external spillable map if all entries could be held in memory. Here too, one executor will have to hold at max all file infos for one partition only right? So, memory is bounded here too in my understanding. ;;; --- 23/Mar/20 19:18;lamber-ken;right, this is a nice design, some thoughts: * if the input data is large, need to increase partitions, "candidates" contains all datas for per partition * if increase partitions, it will cause duplicate loading of the same partition(e.g populateFileIDs() && populateRangeAndBloomFilters()) [https://github.com/vinothchandar/incubator-hudi/blob/hudi-686-bloomindex-v2/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexV2.java] {code:java} @Override public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) { return recordRDD.sortBy((record) -> String.format("%s-%s", record.getPartitionPath(), record.getRecordKey()), true, config.getBloomIndexV2Parallelism()) .mapPartitions((itr) -> new LazyRangeBloomChecker(itr, hoodieTable)) .flatMap(List::iterator) .sortBy(Pair::getRight, true, config.getBloomIndexV2Parallelism()) .mapPartitions((itr) -> new LazyKeyChecker(itr, hoodieTable)) .filter(Option::isPresent) .map(Option::get); } {code} {code:java} private void initIfNeeded(String partitionPath) throws IOException { if (!Objects.equals(partitionPath, currentPartitionPath)) { cleanup(); this.currentPartitionPath = partitionPath; populateFileIDs(); populateRangeAndBloomFilters(); } }{code};;; --- 27/Mar/20 16:16;vinoth;>if the input data is large, need to increase partitions, "candidates" contains all datas for per partition No candidates only contains candidate files per key >if increase partitions, it will cause duplicate loading of the same partition(e.g populateFileIDs() && populateRangeAndBloomFilters()) it will.. That's why we auto tune everything in BloomIndexV1.. but then it needs some memory caching.. Idea here is to make this work for simpler cases well and have an option that does not rely on memory caching ;;; --- 27/Mar/20 16:16;vinoth;>May I know why we need external spillableMap? why can't we use regular map. I don't know the benefits of external spillable map if all entries could be held in memory. Here too, one executor will have to hold at max all file infos for one partition only right? So, memory is bounded here too in my understanding. Just to handle the case the bloom filters won't fit in memory.. ;;; --- 27/Mar/20 22:28;shivnarayan;yeah, was about to respond to lander-ken that intention of this index is to speed up simpler cases and def not intended for one size fits all. at higher scale, prob this may not be the right index to use.;;; --- 01/Jul/20 04:08;vinoth;assigning this to [~shivnarayan].. [~lamber-ken] sounds good?;;; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
