codope commented on code in PR #8758:
URL: https://github.com/apache/hudi/pull/8758#discussion_r1232192328


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -193,121 +190,126 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(L
                   return Collections.emptyIterator();
                 }
 
-              boolean fullKeys = false;
+                boolean fullKeys = false;
 
-              Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
logRecords =
-                  readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
+                Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
+                    readLogRecords(logRecordScanner, sortedKeyPrefixes, 
fullKeys, timings);
 
-              List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
mergedRecords =
-                  readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+                Map<String, HoodieRecord<HoodieMetadataPayload>> mergedRecords 
=
+                    readFromBaseAndMergeWithLogRecords(baseFileReader, 
sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
 
-              LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
-                  sortedKeyPrefixes.size(), timings));
+                LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
+                    sortedKeyPrefixes.size(), timings));
 
-              return mergedRecords.stream()
-                .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
-                .filter(Objects::nonNull)
-                .iterator();
-            } catch (IOException ioe) {
-              throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
-            } finally {
-              closeReader(readers);
-            }
-          });
+                return mergedRecords.values().iterator();
+              } catch (IOException ioe) {
+                throw new HoodieIOException("Error merging records from 
metadata table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
+              } finally {
+                closeReader(readers);
+              }
+            });
   }
 
   @Override
-  public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys,
-                                                                               
           String partitionName) {
-    // Sort the columns so that keys are looked up in order
-    List<String> sortedKeys = new ArrayList<>(keys);
-    Collections.sort(sortedKeys);
-    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
getPartitionFileSliceToKeysMapping(partitionName, sortedKeys);
-    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
new ArrayList<>();
-    AtomicInteger fileSlicesKeysCount = new AtomicInteger();
-    partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, 
fileSliceKeys) -> {
-      Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
-          getOrCreateReaders(partitionName, partitionFileSlicePair.getRight());
-      try {
-        List<Long> timings = new ArrayList<>();
-        HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
-        HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
-        if (baseFileReader == null && logRecordScanner == null) {
-          return;
-        }
+  protected Map<String, HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeys(List<String> keys, String partitionName) {
+    if (keys.isEmpty()) {
+      return Collections.emptyMap();
+    }
 
-        boolean fullKeys = true;
-        Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
-            readLogRecords(logRecordScanner, fileSliceKeys, fullKeys, timings);
-
-        result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, 
fileSliceKeys, fullKeys, logRecords,
-            timings, partitionName));
-
-        LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, 
logMerge] %s ms",
-            fileSliceKeys.size(), timings));
-        fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
-      } catch (IOException ioe) {
-        throw new HoodieIOException("Error merging records from metadata table 
for  " + sortedKeys.size() + " key : ", ioe);
-      } finally {
-        if (!reuse) {
-          closeReader(readers);
-        }
+    Map<String, HoodieRecord<HoodieMetadataPayload>> result;
+
+    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
+        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
+    final int numFileSlices = partitionFileSlices.size();
+    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");
+
+    // Lookup keys from each file slice
+    if (numFileSlices == 1) {
+      // Optimization for a single slice for smaller metadata table partitions
+      result = lookupKeysFromFileSlice(partitionName, keys, 
partitionFileSlices.get(0));
+    } else {
+      // Parallel lookup for large sized partitions with many file slices
+      // Partition the keys by the file slice which contains it
+      ArrayList<ArrayList<String>> partitionedKeys = new 
ArrayList<>(numFileSlices);
+      for (int i = 0; i < numFileSlices; ++i) {
+        partitionedKeys.add(new ArrayList<>());
       }
-    });
+      keys.forEach(key -> {
+        int shardIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, numFileSlices);
+        partitionedKeys.get(shardIndex).add(key);
+      });
+
+      result = new HashMap<>(keys.size());
+      engineContext.setJobStatus(this.getClass().getSimpleName(), "Reading 
keys from metadata table partition " + partitionName);
+      engineContext.map(partitionedKeys, keysList -> {
+        if (keysList.isEmpty()) {
+          return Collections.<String, 
HoodieRecord<HoodieMetadataPayload>>emptyMap();
+        }
+        int shardIndex = 
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0), 
numFileSlices);
+        return lookupKeysFromFileSlice(partitionName, keysList, 
partitionFileSlices.get(shardIndex));

Review Comment:
   Agree with you. Let's keep it open though, maybe Siva is thinking of some 
different angle. We can revisit later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to