Davis-Zhang-Onehouse commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2219862676


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -360,22 +602,99 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> 
lookupKeys(String parti
         .withLength(Long.MAX_VALUE)
         .withShouldUseRecordPosition(false)
         .build();
-         ClosableIterator<IndexedRecord> it = 
fileGroupReader.getClosableIterator()) {
-      Map<String, HoodieRecord<HoodieMetadataPayload>> records = new 
HashMap<>();
-      while (it.hasNext()) {
-        GenericRecord metadataRecord = (GenericRecord) it.next();
-        HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
-        String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
-        HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
-        records.put(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
-      }
-      return records;
+  }
+
+  private HoodiePairData<String, HoodieRecord<HoodieMetadataPayload>> 
lookupRecordsWithMapping(
+      String partitionName,
+      List<String> sortedKeys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    Map<String, HoodieRecord<HoodieMetadataPayload>> map = new HashMap<>();
+    try (ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>> 
iterator =
+             lookupRecordsWithMappingIter(partitionName, sortedKeys, 
fileSlice, isFullKey, keyEncoder)) {
+      iterator.forEachRemaining(entry -> map.put(entry.getKey(), 
entry.getValue()));
+    }
+    return HoodieDataUtils.eagerMapKV(map);
+  }
+
+  private HoodieListData<HoodieRecord<HoodieMetadataPayload>> 
lookupRecordsWithoutMapping(
+      String partitionName,
+      List<String> sortedKeys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    List<HoodieRecord<HoodieMetadataPayload>> res = new ArrayList<>();
+    try (ClosableIterator<HoodieRecord<HoodieMetadataPayload>> iterator =
+             lookupRecordsWithoutMappingIter(partitionName, sortedKeys, 
fileSlice, isFullKey, keyEncoder)) {
+      iterator.forEachRemaining(entry -> res.add(entry));
+    }
+    return HoodieListData.eager(res);
+  }
+
+  private ClosableIterator<Pair<String, HoodieRecord<HoodieMetadataPayload>>> 
lookupRecordsWithMappingIter(
+      String partitionName,
+      List<String> sortedKeys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    return lookupRecords(sortedKeys, fileSlice, isFullKey, metadataRecord -> {
+      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+      String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
+      HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
+      return Pair.of(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
+    }, keyEncoder);
+  }
+
+  private ClosableIterator<HoodieRecord<HoodieMetadataPayload>> 
lookupRecordsWithoutMappingIter(
+      String partitionName,
+      List<String> keys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    return lookupRecords(keys, fileSlice, isFullKey,
+        metadataRecord -> {
+          HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+          return new HoodieAvroRecord<>(new HoodieKey(payload.key, 
partitionName), payload);
+        }, keyEncoder);
+  }
+
+  /**
+   * Lookup records and produce a lazy iterator of mapped HoodieRecords.
+   */
+  private <T> ClosableIterator<T> lookupRecords(
+      List<String> sortedKeys,
+      FileSlice fileSlice,
+      Boolean isFullKey,
+      RecordLookupTransformer<T> transformer,
+      Option<SerializableFunction<String, String>> keyEncoder) {
+    // If no keys to lookup, we must return early, otherwise, the hfile lookup 
will return all records.
+    if (sortedKeys.isEmpty()) {
+      return new EmptyIterator<>();
+    }
+    try {
+      HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
buildFileGroupReader(sortedKeys, fileSlice, isFullKey, keyEncoder);
+      ClosableIterator<IndexedRecord> rawIterator = 
fileGroupReader.getClosableIterator();
+
+      return new CloseableMappingIterator<>(rawIterator, record -> {
+        GenericRecord metadataRecord = (GenericRecord) record;
+        try {
+          return transformer.apply(metadataRecord);
+        } catch (IOException e) {
+          throw new HoodieIOException("Error processing record with key " + 
new HoodieMetadataPayload(Option.of(metadataRecord)).key, e);
+        }
+      });
     } catch (IOException e) {
-      throw new HoodieIOException(
-          "Error merging records from metadata table for " + keys.size() + " 
keys : ", e);
+      throw new HoodieIOException("Error merging records from metadata table 
for " + sortedKeys.size() + " keys", e);
     }
   }
 
+  // Functional interface for generic payload transformation
+  @FunctionalInterface
+  private interface RecordLookupTransformer<T> {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to