codope commented on code in PR #11162:
URL: https://github.com/apache/hudi/pull/11162#discussion_r1630642711


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -851,6 +862,167 @@ private Map<String, String> 
reverseLookupSecondaryKeys(String partitionName, Lis
     return recordKeyMap;
   }
 
+  @Override
+  protected Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
getSecondaryIndexRecords(List<String> keys, String partitionName) {
+    if (keys.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    // Load the file slices for the partition. Each file slice is a shard 
which saves a portion of the keys.
+    List<FileSlice> partitionFileSlices = 
partitionFileSliceMap.computeIfAbsent(partitionName,
+        k -> 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
metadataFileSystemView, partitionName));
+    final int numFileSlices = partitionFileSlices.size();
+    ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for 
partition " + partitionName + " should be > 0");
+
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Lookup keys 
from each file slice");
+    HoodieData<FileSlice> partitionRDD = 
engineContext.parallelize(partitionFileSlices);
+    // Define the seqOp function (merges elements within a partition)
+    Functions.Function2<Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>, FileSlice, Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>> seqOp =
+        (accumulator, partition) -> {
+          Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
currentFileSliceResult = lookupSecondaryKeysFromFileSlice(partitionName, keys, 
partition);
+          currentFileSliceResult.forEach((secondaryKey, secondaryRecords) -> 
accumulator.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> {
+            newRecords.addAll(oldRecords);
+            return newRecords;
+          }));
+          return accumulator;
+        };
+    // Define the combOp function (merges elements across partitions)
+    Functions.Function2<Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>, Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>, Map<String, 
List<HoodieRecord<HoodieMetadataPayload>>>> combOp =
+        (map1, map2) -> {
+          map2.forEach((secondaryKey, secondaryRecords) -> 
map1.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> {
+            newRecords.addAll(oldRecords);
+            return newRecords;
+          }));
+          return map1;
+        };
+    // Use aggregate to merge results within and across partitions
+    // Define the zero value (initial value)
+    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> zeroValue = new 
HashMap<>();
+    return engineContext.aggregate(partitionRDD, zeroValue, seqOp, combOp);
+  }
+
+  /**
+   * Lookup list of keys from a single file slice.
+   *
+   * @param partitionName Name of the partition
+   * @param secondaryKeys The list of secondary keys to lookup
+   * @param fileSlice     The file slice to read
+   * @return A {@code Map} of secondary-key to list of {@code HoodieRecord} 
for the secondary-keys which were found in the file slice
+   */
+  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
lookupSecondaryKeysFromFileSlice(String partitionName, List<String> 
secondaryKeys, FileSlice fileSlice) {
+    Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>();
+
+    Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = 
getOrCreateReaders(partitionName, fileSlice);
+    try {
+      List<Long> timings = new ArrayList<>(1);
+      HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+      HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+      if (baseFileReader == null && logRecordScanner == null) {
+        return Collections.emptyMap();
+      }
+
+      // Sort it here once so that we don't need to sort individually for base 
file and for each individual log files.
+      Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
+      List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
+      Collections.sort(sortedSecondaryKeys);
+      secondaryKeySet.addAll(sortedSecondaryKeys);
+
+      logRecordScanner.getRecords().forEach(record -> {
+        HoodieMetadataPayload payload = record.getData();
+        String recordKey = payload.getRecordKeyFromSecondaryIndex();
+        if (secondaryKeySet.contains(recordKey)) {
+          String secondaryKey = payload.getRecordKeyFromSecondaryIndex();
+          logRecordsMap.computeIfAbsent(secondaryKey, k -> new 
HashMap<>()).put(recordKey, record);
+        }
+      });
+
+      return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, 
sortedSecondaryKeys, logRecordsMap, timings, partitionName);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table 
for  " + secondaryKeys.size() + " key : ", ioe);
+    } finally {
+      if (!reuse) {
+        closeReader(readers);
+      }
+    }
+  }
+
+  private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> 
readNonUniqueRecordsAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
+                                                                               
                             List<String> sortedKeys,
+                                                                               
                             Map<String, HashMap<String, HoodieRecord>> 
logRecordsMap,
+                                                                               
                             List<Long> timings,
+                                                                               
                             String partitionName) throws IOException {
+    HoodieTimer timer = HoodieTimer.start();
+
+    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new 
HashMap<>();
+    if (reader == null) {
+      // No base file at all
+      timings.add(timer.endTimer());
+      logRecordsMap.forEach((secondaryKey, logRecords) -> {
+        List<HoodieRecord<HoodieMetadataPayload>> recordList = new 
ArrayList<>();
+        logRecords.values().forEach(record -> {
+          recordList.add((HoodieRecord<HoodieMetadataPayload>)record);
+        });
+        resultMap.put(secondaryKey, recordList);
+      });
+      return resultMap;
+    }
+
+    HoodieTimer readTimer = HoodieTimer.start();
+
+    Map<String, List<HoodieRecord<HoodieMetadataPayload>>> baseFileRecordsMap =
+        fetchBaseFileAllRecordsByKeys(reader, sortedKeys, true, partitionName);
+    logRecordsMap.forEach((secondaryKey, logRecords) -> {
+      if (!baseFileRecordsMap.containsKey(secondaryKey)) {
+        List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
+            .values()
+            .stream()
+            .map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
+            .collect(Collectors.toList());
+
+        resultMap.put(secondaryKey, recordList);
+      } else {
+        List<HoodieRecord<HoodieMetadataPayload>> baseFileRecords = 
baseFileRecordsMap.get(secondaryKey);
+        List<HoodieRecord<HoodieMetadataPayload>> resultRecords = new 
ArrayList<>();
+
+        baseFileRecords.forEach(prevRecord -> {
+          HoodieMetadataPayload prevPayload = prevRecord.getData();
+          String recordKey = prevPayload.getRecordKeyFromSecondaryIndex();
+
+          if (!logRecords.containsKey(recordKey)) {
+            resultRecords.add(prevRecord);
+          } else {
+            // Merge the records
+            HoodieRecord<HoodieMetadataPayload> newRecord = 
logRecords.get(recordKey);
+            HoodieMetadataPayload newPayload = newRecord.getData();
+
+            assert 
recordKey.equals(newPayload.getRecordKeyFromSecondaryIndex());

Review Comment:
   sorry i had added for some debugging purpose. Fixed now. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to