yihua commented on code in PR #7642: URL: https://github.com/apache/hudi/pull/7642#discussion_r1087306340
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java: ########## @@ -66,42 +80,94 @@ public static SparkHoodieBloomIndexHelper getInstance() { public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys( HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData<String, String> partitionRecordKeyPairs, - HoodieData<Pair<String, HoodieKey>> fileComparisonPairs, + HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs, Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) { - JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD = - HoodieJavaRDD.getJavaRDD(fileComparisonPairs) - .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight())); int inputParallelism = HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size(); - int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); - LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" - + config.getBloomIndexParallelism() + "}"); + int configuredBloomIndexParallelism = config.getBloomIndexParallelism(); + // NOTE: Target parallelism could be overridden by the config + int targetParallelism = + configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism : inputParallelism; + + LOG.info(String.format("Input parallelism: %d, Index parallelism: %d", inputParallelism, targetParallelism)); + + JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD = HoodieJavaRDD.getJavaRDD(fileComparisonPairs); JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD; + if (config.getBloomIndexUseMetadata() && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions() .contains(BLOOM_FILTERS.getPartitionPath())) { - // Step 1: Sort by file id - JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs = - fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism); + SerializableConfiguration hadoopConf = new SerializableConfiguration(hoodieTable.getHadoopConf()); + + HoodieTableFileSystemView baseFileOnlyView = + getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet()); + + Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast = + ((HoodieSparkEngineContext) context).getJavaSparkContext().broadcast(baseFileOnlyView); + + // When leveraging MT we're aiming for following goals: + // - (G1) All requests to MT are made in batch (ie we're trying to fetch all the values + // for corresponding keys at once) + // - (G2) Each task reads no more than just _one_ file-group from the MT Bloom Filters + // partition + // + // Ta achieve G2, following invariant have to be maintained: Spark partitions have to be + // affine w/ Metadata Table's file-groups, meaning that each Spark partition holds records + // belonging to one and only file-group in MT Bloom Filters partition. To provide for that + // we need to make sure + // - Spark's used [[Partitioner]] employs same hashing function as Metadata Table (as well + // as being applied to the same keys as the MT one) + // - Make sure that # of partitions is congruent to the # of file-groups (ie number of Spark + // partitions is a multiple of the # of the file-groups). + // + // Last provision is necessary, so that for every key it's the case that + // + // (hash(key) % N) % M = hash(key) % M, iff N % M = 0 + // + // Let's take an example of N = 8 and M = 4 (default # of file-groups in Bloom Filter + // partition). In that case Spark partitions for which `hash(key) % N` will be either 0 + // or 4, will map to the same (first) file-group in MT + // + // To achieve G1, we drastically reduce # of RDD partitions actually reading from MT, by + // setting target parallelism as a (low-factor) multiple of the # of the file-groups in MT + int targetMetadataParallelism = + config.getMetadataConfig().getBloomFilterIndexFileGroupCount() * config.getBloomIndexMetadataFetchingParallelismFactor(); + + AffineBloomIndexFileGroupPartitioner partitioner = + new AffineBloomIndexFileGroupPartitioner(baseFileOnlyViewBroadcast, targetMetadataParallelism); + + keyLookupResultRDD = + // First, we need to repartition and sort records using [[AffineBloomIndexFileGroupPartitioner]] + // to make sure every Spark task accesses no more than just a single file-group in MT (allows + // us to achieve G2). + // + // NOTE: Sorting records w/in individual partitions is required to make sure that we cluster + // together keys co-located w/in the MT files (sorted by keys) + fileComparisonsRDD.repartitionAndSortWithinPartitions(partitioner) Review Comment: Got it. Basically, `fileComparisonsRDD` contains entries with the same file IDs and different record keys. We want to cluster the entries with the same file IDs together. The ordering between different file IDs does not really matter here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org