codope commented on code in PR #11162: URL: https://github.com/apache/hudi/pull/11162#discussion_r1630642711
########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java: ########## @@ -851,6 +862,167 @@ private Map<String, String> reverseLookupSecondaryKeys(String partitionName, Lis return recordKeyMap; } + @Override + protected Map<String, List<HoodieRecord<HoodieMetadataPayload>>> getSecondaryIndexRecords(List<String> keys, String partitionName) { + if (keys.isEmpty()) { + return Collections.emptyMap(); + } + + // Load the file slices for the partition. Each file slice is a shard which saves a portion of the keys. + List<FileSlice> partitionFileSlices = partitionFileSliceMap.computeIfAbsent(partitionName, + k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, metadataFileSystemView, partitionName)); + final int numFileSlices = partitionFileSlices.size(); + ValidationUtils.checkState(numFileSlices > 0, "Number of file slices for partition " + partitionName + " should be > 0"); + + engineContext.setJobStatus(this.getClass().getSimpleName(), "Lookup keys from each file slice"); + HoodieData<FileSlice> partitionRDD = engineContext.parallelize(partitionFileSlices); + // Define the seqOp function (merges elements within a partition) + Functions.Function2<Map<String, List<HoodieRecord<HoodieMetadataPayload>>>, FileSlice, Map<String, List<HoodieRecord<HoodieMetadataPayload>>>> seqOp = + (accumulator, partition) -> { + Map<String, List<HoodieRecord<HoodieMetadataPayload>>> currentFileSliceResult = lookupSecondaryKeysFromFileSlice(partitionName, keys, partition); + currentFileSliceResult.forEach((secondaryKey, secondaryRecords) -> accumulator.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> { + newRecords.addAll(oldRecords); + return newRecords; + })); + return accumulator; + }; + // Define the combOp function (merges elements across partitions) + Functions.Function2<Map<String, List<HoodieRecord<HoodieMetadataPayload>>>, Map<String, List<HoodieRecord<HoodieMetadataPayload>>>, Map<String, List<HoodieRecord<HoodieMetadataPayload>>>> combOp = + (map1, map2) -> { + map2.forEach((secondaryKey, secondaryRecords) -> map1.merge(secondaryKey, secondaryRecords, (oldRecords, newRecords) -> { + newRecords.addAll(oldRecords); + return newRecords; + })); + return map1; + }; + // Use aggregate to merge results within and across partitions + // Define the zero value (initial value) + Map<String, List<HoodieRecord<HoodieMetadataPayload>>> zeroValue = new HashMap<>(); + return engineContext.aggregate(partitionRDD, zeroValue, seqOp, combOp); + } + + /** + * Lookup list of keys from a single file slice. + * + * @param partitionName Name of the partition + * @param secondaryKeys The list of secondary keys to lookup + * @param fileSlice The file slice to read + * @return A {@code Map} of secondary-key to list of {@code HoodieRecord} for the secondary-keys which were found in the file slice + */ + private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> lookupSecondaryKeysFromFileSlice(String partitionName, List<String> secondaryKeys, FileSlice fileSlice) { + Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>(); + + Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice); + try { + List<Long> timings = new ArrayList<>(1); + HoodieSeekingFileReader<?> baseFileReader = readers.getKey(); + HoodieMetadataLogRecordReader logRecordScanner = readers.getRight(); + if (baseFileReader == null && logRecordScanner == null) { + return Collections.emptyMap(); + } + + // Sort it here once so that we don't need to sort individually for base file and for each individual log files. + Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size()); + List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys); + Collections.sort(sortedSecondaryKeys); + secondaryKeySet.addAll(sortedSecondaryKeys); + + logRecordScanner.getRecords().forEach(record -> { + HoodieMetadataPayload payload = record.getData(); + String recordKey = payload.getRecordKeyFromSecondaryIndex(); + if (secondaryKeySet.contains(recordKey)) { + String secondaryKey = payload.getRecordKeyFromSecondaryIndex(); + logRecordsMap.computeIfAbsent(secondaryKey, k -> new HashMap<>()).put(recordKey, record); + } + }); + + return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, sortedSecondaryKeys, logRecordsMap, timings, partitionName); + } catch (IOException ioe) { + throw new HoodieIOException("Error merging records from metadata table for " + secondaryKeys.size() + " key : ", ioe); + } finally { + if (!reuse) { + closeReader(readers); + } + } + } + + private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> readNonUniqueRecordsAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader, + List<String> sortedKeys, + Map<String, HashMap<String, HoodieRecord>> logRecordsMap, + List<Long> timings, + String partitionName) throws IOException { + HoodieTimer timer = HoodieTimer.start(); + + Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new HashMap<>(); + if (reader == null) { + // No base file at all + timings.add(timer.endTimer()); + logRecordsMap.forEach((secondaryKey, logRecords) -> { + List<HoodieRecord<HoodieMetadataPayload>> recordList = new ArrayList<>(); + logRecords.values().forEach(record -> { + recordList.add((HoodieRecord<HoodieMetadataPayload>)record); + }); + resultMap.put(secondaryKey, recordList); + }); + return resultMap; + } + + HoodieTimer readTimer = HoodieTimer.start(); + + Map<String, List<HoodieRecord<HoodieMetadataPayload>>> baseFileRecordsMap = + fetchBaseFileAllRecordsByKeys(reader, sortedKeys, true, partitionName); + logRecordsMap.forEach((secondaryKey, logRecords) -> { + if (!baseFileRecordsMap.containsKey(secondaryKey)) { + List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords + .values() + .stream() + .map(record -> (HoodieRecord<HoodieMetadataPayload>) record) + .collect(Collectors.toList()); + + resultMap.put(secondaryKey, recordList); + } else { + List<HoodieRecord<HoodieMetadataPayload>> baseFileRecords = baseFileRecordsMap.get(secondaryKey); + List<HoodieRecord<HoodieMetadataPayload>> resultRecords = new ArrayList<>(); + + baseFileRecords.forEach(prevRecord -> { + HoodieMetadataPayload prevPayload = prevRecord.getData(); + String recordKey = prevPayload.getRecordKeyFromSecondaryIndex(); + + if (!logRecords.containsKey(recordKey)) { + resultRecords.add(prevRecord); + } else { + // Merge the records + HoodieRecord<HoodieMetadataPayload> newRecord = logRecords.get(recordKey); + HoodieMetadataPayload newPayload = newRecord.getData(); + + assert recordKey.equals(newPayload.getRecordKeyFromSecondaryIndex()); Review Comment: sorry i had added for some debugging purpose. Fixed now. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org