yihua commented on code in PR #7642:
URL: https://github.com/apache/hudi/pull/7642#discussion_r1087306340


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -66,42 +80,94 @@ public static SparkHoodieBloomIndexHelper getInstance() {
   public HoodiePairData<HoodieKey, HoodieRecordLocation> 
findMatchingFilesForRecordKeys(
       HoodieWriteConfig config, HoodieEngineContext context, HoodieTable 
hoodieTable,
       HoodiePairData<String, String> partitionRecordKeyPairs,
-      HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+      HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
       Map<String, Long> recordsPerPartition) {
-    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
-        HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
-            .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
 
     int inputParallelism = 
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
-    int joinParallelism = Math.max(inputParallelism, 
config.getBloomIndexParallelism());
-    LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: 
${"
-        + config.getBloomIndexParallelism() + "}");
+    int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
 
+    // NOTE: Target parallelism could be overridden by the config
+    int targetParallelism =
+        configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism 
: inputParallelism;
+
+    LOG.info(String.format("Input parallelism: %d, Index parallelism: %d", 
inputParallelism, targetParallelism));
+
+    JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD = 
HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
     JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+
     if (config.getBloomIndexUseMetadata()
         && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
         .contains(BLOOM_FILTERS.getPartitionPath())) {
-      // Step 1: Sort by file id
-      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
-          fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+      SerializableConfiguration hadoopConf = new 
SerializableConfiguration(hoodieTable.getHadoopConf());
+
+      HoodieTableFileSystemView baseFileOnlyView =
+          getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet());
+
+      Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast =
+          ((HoodieSparkEngineContext) 
context).getJavaSparkContext().broadcast(baseFileOnlyView);
+
+      // When leveraging MT we're aiming for following goals:
+      //    - (G1) All requests to MT are made in batch (ie we're trying to 
fetch all the values
+      //      for corresponding keys at once)
+      //    - (G2) Each task reads no more than just _one_ file-group from the 
MT Bloom Filters
+      //    partition
+      //
+      // Ta achieve G2, following invariant have to be maintained: Spark 
partitions have to be
+      // affine w/ Metadata Table's file-groups, meaning that each Spark 
partition holds records
+      // belonging to one and only file-group in MT Bloom Filters partition. 
To provide for that
+      // we need to make sure
+      //    - Spark's used [[Partitioner]] employs same hashing function as 
Metadata Table (as well
+      //      as being applied to the same keys as the MT one)
+      //    - Make sure that # of partitions is congruent to the # of 
file-groups (ie number of Spark
+      //    partitions is a multiple of the # of the file-groups).
+      //
+      //    Last provision is necessary, so that for every key it's the case 
that
+      //
+      //        (hash(key) % N) % M = hash(key) % M, iff N % M = 0
+      //
+      //    Let's take an example of N = 8 and M = 4 (default # of file-groups 
in Bloom Filter
+      //    partition). In that case Spark partitions for which `hash(key) % 
N` will be either 0
+      //    or 4, will map to the same (first) file-group in MT
+      //
+      // To achieve G1, we drastically reduce # of RDD partitions actually 
reading from MT, by
+      // setting target parallelism as a (low-factor) multiple of the # of the 
file-groups in MT
+      int targetMetadataParallelism =
+          config.getMetadataConfig().getBloomFilterIndexFileGroupCount() * 
config.getBloomIndexMetadataFetchingParallelismFactor();
+
+      AffineBloomIndexFileGroupPartitioner partitioner =
+          new AffineBloomIndexFileGroupPartitioner(baseFileOnlyViewBroadcast, 
targetMetadataParallelism);
+
+      keyLookupResultRDD =
+          // First, we need to repartition and sort records using 
[[AffineBloomIndexFileGroupPartitioner]]
+          // to make sure every Spark task accesses no more than just a single 
file-group in MT (allows
+          // us to achieve G2).
+          //
+          // NOTE: Sorting records w/in individual partitions is required to 
make sure that we cluster
+          //       together keys co-located w/in the MT files (sorted by keys)
+          fileComparisonsRDD.repartitionAndSortWithinPartitions(partitioner)

Review Comment:
   Got it.  Basically, `fileComparisonsRDD` contains entries with the same file 
IDs and different record keys.  We want to cluster the entries with the same 
file IDs together.  The ordering between different file IDs does not really 
matter here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to