the-other-tim-brown commented on code in PR #13602:
URL: https://github.com/apache/hudi/pull/13602#discussion_r2228735026


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -496,41 +503,82 @@ private HoodieData<String> repartitioningIfNeeded(
     return keys;
   }
 
-  private HoodieFileGroupReader<IndexedRecord> 
buildFileGroupReader(List<String> sortedKeys,
-                                                                    FileSlice 
fileSlice,
-                                                                    boolean 
isFullKey) {
+  private ClosableIterator<IndexedRecord> readSliceWithFilter(List<String> 
sortedKeys, FileSlice fileSlice, boolean isFullKey) throws IOException {
     Option<HoodieInstant> latestMetadataInstant =
         
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
     String latestMetadataInstantTime =
         
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
     // Only those log files which have a corresponding completed instant on 
the dataset should be read
     // This is because the metadata table is updated before the dataset 
instants are committed.
     Set<String> validInstantTimestamps = getValidInstantTimestamps();
-    InstantRange instantRange = InstantRange.builder()
+    Option<InstantRange> instantRange = Option.of(InstantRange.builder()
         .rangeType(InstantRange.RangeType.EXACT_MATCH)
-        .explicitInstants(validInstantTimestamps).build();
-    
+        .explicitInstants(validInstantTimestamps).build());
+
+    // If reuse is enabled and full scan is allowed for the partition, we can 
reuse the file readers for base files and the reader context for the log files.
+    Map<StoragePath, HoodieAvroFileReader> baseFileReaders = 
Collections.emptyMap();
+    
FileGroupRecordBufferInitializer.ReusableFileGroupRecordBufferInitializer<IndexedRecord>
 recordBufferInitializer = null;
+    if (reuse && isFullScanAllowedForPartition(fileSlice.getPartitionPath())) {
+      Pair<HoodieAvroFileReader, 
FileGroupRecordBufferInitializer.ReusableFileGroupRecordBufferInitializer<IndexedRecord>>
 readers =
+          reusableFileReaders.computeIfAbsent(fileSlice.getFileGroupId(), fgId 
-> {
+            try {
+              HoodieAvroFileReader baseFileReader = null;
+              if (fileSlice.getBaseFile().isPresent()) {
+                baseFileReader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(storage).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+                    .getFileReader(metadataConfig, 
fileSlice.getBaseFile().get().getStoragePath(), 
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
+              }
+              return Pair.of(baseFileReader, 
buildReusableRecordBufferInitializer(fileSlice, latestMetadataInstantTime, 
instantRange));
+            } catch (IOException ex) {
+              throw new HoodieIOException("Error opening readers for metadata 
table partition " + fileSlice.getPartitionPath(), ex);
+            }
+          });
+      if (fileSlice.getBaseFile().isPresent()) {
+        baseFileReaders = 
Collections.singletonMap(fileSlice.getBaseFile().get().getStoragePath(), 
readers.getLeft());
+      }
+
+      ValidationUtils.checkArgument(isFullKey, "For Metadata Table Reuse, key 
filter should be based on full keys");
+      recordBufferInitializer = readers.getRight();
+    }
+
     HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(
         storageConf,
         metadataMetaClient.getTableConfig(),
-        Option.of(instantRange),
+        instantRange,
         Option.of(
             isFullKey
                 ? transformKeysToPredicate(sortedKeys)
-                : transformKeyPrefixesToPredicate(sortedKeys)));
-    return HoodieFileGroupReader.<IndexedRecord>newBuilder()
+                : transformKeyPrefixesToPredicate(sortedKeys)),
+        baseFileReaders);
+
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metadataMetaClient)
         .withLatestCommitTime(latestMetadataInstantTime)
         .withFileSlice(fileSlice)
-        .withDataSchema(schema)
-        .withRequestedSchema(schema)
+        .withDataSchema(SCHEMA)
+        .withRequestedSchema(SCHEMA)
         .withProps(buildFileGroupReaderProperties(metadataConfig))
-        .withStart(0)
-        .withLength(Long.MAX_VALUE)
-        .withShouldUseRecordPosition(false)
+        .withRecordBufferInitializer(recordBufferInitializer)
         .build();
+
+    return new 
CloseableIteratorWithReuse<>(fileGroupReader.getClosableIterator(), reuse);

Review Comment:
   I was able to restructure the code a bit so this is just an implementation 
detail of the record buffer and the initializer for it so we don't need to 
introduce a new class here anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to