nsivabalan commented on a change in pull request #4352: URL: https://github.com/apache/hudi/pull/4352#discussion_r795049484
########## File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java ########## @@ -199,6 +260,21 @@ public Builder enable(boolean enable) { return this; } + public Builder withMetadataIndexBloomFilter(boolean enable) { + metadataConfig.setValue(ENABLE_METADATA_INDEX_BLOOM_FILTER, String.valueOf(enable)); + return this; + } + + public Builder withMetadataIndexColumnStats(boolean enable) { + metadataConfig.setValue(ENABLE_METADATA_INDEX_COLUMN_STATS, String.valueOf(enable)); + return this; + } + + public Builder withMetadataIndexForAllColumns(boolean enable) { Review comment: I don't see setter methods for file group count? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java ########## @@ -125,30 +128,43 @@ private void initIfNeeded() { return recordsByKeys.size() == 0 ? Option.empty() : recordsByKeys.get(0).getValue(); } - protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys, String partitionName) { - Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(keys.get(0), partitionName); - try { - List<Long> timings = new ArrayList<>(); - HoodieFileReader baseFileReader = readers.getKey(); - HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); + @Override + protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys, + String partitionName) { + Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = getPartitionFileSlices(partitionName, keys); Review comment: I have added comments down below. But getPartitionFileSlices should follow same semantics as openReadersIfNeeded. may be we should try to cache them and reuse if possible. I see that we don't do that as of now in this patch. Prior to this patch, this call was made within openReadersIfNeeded. Or do you think there are any challenges with that? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ########## @@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi */ public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) { Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); - partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); + partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); - HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath()); - HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo); + HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, + fileInfo); return new HoodieRecord<>(key, payload); } /** * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition. * - * @param partition The name of the partition - * @param filesAdded Mapping of files to their sizes for files which have been added to this partition + * @param partition The name of the partition + * @param filesAdded Mapping of files to their sizes for files which have been added to this partition * @param filesDeleted List of files which have been deleted from this partition */ public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition, - Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) { + Option<Map<String, Long>> filesAdded, + Option<List<String>> filesDeleted) { Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); filesAdded.ifPresent( m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false)))); filesDeleted.ifPresent( - m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); + m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); - HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath()); - HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo); + HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo); return new HoodieRecord<>(key, payload); } + /** + * Create bloom filter metadata record. + * + * @param partitionName - Partition name + * @param baseFileName - Base file name for which the bloom filter needs to persisted + * @param timestamp - Instant timestamp responsible for this record + * @param bloomFilter - Bloom filter for the File + * @param isDeleted - Is the bloom filter no more valid + * @return Metadata payload containing the fileID and its bloom filter record + */ + public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final String partitionName, + final String baseFileName, + final String timestamp, + final ByteBuffer bloomFilter, + final boolean isDeleted) { + ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR) + && FSUtils.isBaseFile(new Path(baseFileName)), + "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!"); + final String bloomFilterKey = new PartitionIndexID(partitionName).asBase64EncodedString() + .concat(new FileIndexID(baseFileName).asBase64EncodedString()); + HoodieKey key = new HoodieKey(bloomFilterKey, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()); + + // TODO: Get the bloom filter type from the file + HoodieMetadataBloomFilter metadataBloomFilter = + new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(), + timestamp, bloomFilter, isDeleted); + HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(), + HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter); + return new HoodieRecord<>(key, metadataPayload); + } + @Override public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { ValidationUtils.checkArgument(previousRecord.type == type, - "Cannot combine " + previousRecord.type + " with " + type); - - Map<String, HoodieMetadataFileInfo> combinedFileInfo = null; + "Cannot combine " + previousRecord.type + " with " + type); switch (type) { - case PARTITION_LIST: - case FILE_LIST: - combinedFileInfo = combineFilesystemMetadata(previousRecord); - break; + case METADATA_TYPE_PARTITION_LIST: + case METADATA_TYPE_FILE_LIST: + Map<String, HoodieMetadataFileInfo> combinedFileInfo = combineFilesystemMetadata(previousRecord); + return new HoodieMetadataPayload(key, type, combinedFileInfo); + case METADATA_TYPE_BLOOM_FILTER: + HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord); + return new HoodieMetadataPayload(key, type, combineBloomFilterMetadata); + case METADATA_TYPE_COLUMN_STATS: + return new HoodieMetadataPayload(key, type, combineColumnStatsMetadatat(previousRecord)); default: throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type); } + } + + private HoodieMetadataBloomFilter combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) { + return this.bloomFilterMetadata; + } - return new HoodieMetadataPayload(key, type, combinedFileInfo); + private HoodieColumnStats combineColumnStatsMetadatat(HoodieMetadataPayload previousRecord) { + return this.columnStatMetadata; Review comment: just so I we are in same page. for bloom filter partition, I understand either a file will be added or deleted and there are no mutations as such. but for column stats, if a file is updated, we fetch column stats from latest file and so we should be good to completely ignore older version of the record? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java ########## @@ -233,38 +249,69 @@ private void initIfNeeded() { } /** - * Returns a new pair of readers to the base and log files. + * Get the file slice details for the given key in a partition. + * + * @param partitionName - Metadata partition name + * @param key - Key to get the file slice for + * @return Partition and file slice pair for the given key */ - private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String key, String partitionName) { - return partitionReaders.computeIfAbsent(partitionName, k -> { - try { - final long baseFileOpenMs; - final long logScannerOpenMs; - HoodieFileReader baseFileReader = null; - HoodieMetadataMergedLogRecordReader logRecordScanner = null; + private Pair<String, FileSlice> getPartitionFileSlice(final String partitionName, final String key) { + // Metadata is in sync till the latest completed instant on the dataset + List<FileSlice> latestFileSlices = + HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); + + final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, + latestFileSlices.size())); + return Pair.of(partitionName, slice); + } + + /** + * Get the latest file slices for the interested keys in a given partition. + * + * @param partitionName - Partition to get the file slices from + * @param keys - Interested keys + * @return FileSlices for the keys + */ + private Map<Pair<String, FileSlice>, List<String>> getPartitionFileSlices(final String partitionName, final List<String> keys) { + // Metadata is in sync till the latest completed instant on the dataset + List<FileSlice> latestFileSlices = + HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); + + Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = new HashMap<>(); + for (String key : keys) { + final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, + latestFileSlices.size())); + final Pair<String, FileSlice> keyFileSlicePair = Pair.of(partitionName, slice); + partitionFileSliceToKeysMap.computeIfAbsent(keyFileSlicePair, k -> new ArrayList<>()).add(key); + } + return partitionFileSliceToKeysMap; + } - // Metadata is in sync till the latest completed instant on the dataset + /** + * Create a file reader and the record scanner for a given partition and file slice + * if readers are not already available. + * + * @param partitionName - Partition name + * @param slice - The file slice to open readers for + * @return File reader and the record scanner pair for the requested file slice + */ + private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String partitionName, FileSlice slice) { + return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> { + try { HoodieTimer timer = new HoodieTimer().startTimer(); - List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); - if (latestFileSlices.size() == 0) { - // empty partition - return Pair.of(null, null); - } - ValidationUtils.checkArgument(latestFileSlices.size() == 1, String.format("Invalid number of file slices: found=%d, required=%d", latestFileSlices.size(), 1)); - final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, latestFileSlices.size())); // Open base file reader Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer); - baseFileReader = baseFileReaderOpenTimePair.getKey(); - baseFileOpenMs = baseFileReaderOpenTimePair.getValue(); + HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey(); + final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue(); // Open the log record scanner using the log files from the latest file slice - Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair = getLogRecordScanner(slice, - partitionName); - logRecordScanner = logRecordScannerOpenTimePair.getKey(); - logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); + Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair = getLogRecordScanner(slice, partitionName); + HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey(); + final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); - metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + logScannerOpenMs)); + metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, Review comment: minor. can we fix the name of the metric (HoodieMetadataMetrics.SCAN_STR). this metric represents time to just open the readers. not entire scan time. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ########## @@ -94,13 +136,60 @@ public HoodieMetadataPayload(Option<GenericRecord> record) { filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted"))); }); } + + if (type == METADATA_TYPE_BLOOM_FILTER) { + final GenericRecord metadataRecord = (GenericRecord) record.get().get(SCHEMA_FIELD_ID_BLOOM_FILTER); + if (metadataRecord == null) { + throw new HoodieMetadataException("Valid " + SCHEMA_FIELD_ID_BLOOM_FILTER + " record expected for type: " + METADATA_TYPE_BLOOM_FILTER); + } + bloomFilterMetadata = new HoodieMetadataBloomFilter( + (String) metadataRecord.get(BLOOM_FILTER_FIELD_TYPE), + (String) metadataRecord.get(BLOOM_FILTER_FIELD_TIMESTAMP), + (ByteBuffer) metadataRecord.get(BLOOM_FILTER_FIELD_BLOOM_FILTER), + (Boolean) metadataRecord.get(BLOOM_FILTER_FIELD_IS_DELETED) + ); + } + + if (type == METADATA_TYPE_COLUMN_STATS) { + GenericRecord v = (GenericRecord) record.get().get(SCHEMA_FIELD_ID_COLUMN_STATS); + if (v == null) { + throw new HoodieMetadataException("Valid " + SCHEMA_FIELD_ID_COLUMN_STATS + " record expected for type: " + METADATA_TYPE_COLUMN_STATS); + } + columnStatMetadata = new HoodieColumnStats( + (String) v.get(COLUMN_STATS_FIELD_RESOURCE_NAME), + (String) v.get(COLUMN_STATS_FIELD_MIN_VALUE), + (String) v.get(COLUMN_STATS_FIELD_MAX_VALUE), + (Long) v.get(COLUMN_STATS_FIELD_NULL_COUNT), + (Long) v.get(COLUMN_STATS_FIELD_VALUE_COUNT), + (Long) v.get(COLUMN_STATS_FIELD_TOTAL_SIZE), + (Long) v.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE), + (Boolean) v.get(COLUMN_STATS_FIELD_IS_DELETED) + ); + } } } private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFileInfo> filesystemMetadata) { + this(key, type, filesystemMetadata, null, null); + } + + private HoodieMetadataPayload(String key, int type, HoodieMetadataBloomFilter metadataBloomFilter) { + this(key, type, null, metadataBloomFilter, null); + } + + private HoodieMetadataPayload(String key, int type, HoodieColumnStats columnStats) { Review comment: do you think we should name this HoodieMetadataColumnStats to be in line with HoodieMetadataBloomFilter ? ########## File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java ########## @@ -134,22 +139,41 @@ public BloomFilter readBloomFilter() { } @Override - public Set<String> filterRowKeys(Set candidateRowKeys) { - // Current implementation reads all records and filters them. In certain cases, it many be better to: - // 1. Scan a limited subset of keys (min/max range of candidateRowKeys) - // 2. Lookup keys individually (if the size of candidateRowKeys is much less than the total keys in file) - try { - List<Pair<String, R>> allRecords = readAllRecords(); - Set<String> rowKeys = new HashSet<>(); - allRecords.forEach(t -> { - if (candidateRowKeys.contains(t.getFirst())) { - rowKeys.add(t.getFirst()); - } - }); - return rowKeys; - } catch (IOException e) { - throw new HoodieIOException("Failed to read row keys from " + path, e); + public Set<String> filterRowKeys(Set<String> candidateRowKeys) { + return candidateRowKeys.stream().filter(k -> { + try { + return isKeyAvailable(k); Review comment: I see that prior to this patch, we were reading all records and then looking up the cadidateRowKeys. here we are doing pointed look up for each key at a time. Is this based on some perf tuning? will sync up w/ you f2f to understand more. Can you add a line in java doc that this method expects candidate keys to be sorted. So, in future if someone develops more code using this, would know the contract from this method. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java ########## @@ -125,30 +128,43 @@ private void initIfNeeded() { return recordsByKeys.size() == 0 ? Option.empty() : recordsByKeys.get(0).getValue(); } - protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys, String partitionName) { - Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(keys.get(0), partitionName); - try { - List<Long> timings = new ArrayList<>(); - HoodieFileReader baseFileReader = readers.getKey(); - HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); + @Override + protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys, + String partitionName) { + Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = getPartitionFileSlices(partitionName, keys); + List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>(); + AtomicInteger fileSlicesKeysCount = new AtomicInteger(); + partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> { + Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = openReadersIfNeeded(partitionName, + partitionFileSlicePair.getRight()); + try { + List<Long> timings = new ArrayList<>(); + HoodieFileReader baseFileReader = readers.getKey(); + HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); - if (baseFileReader == null && logRecordScanner == null) { - return Collections.emptyList(); - } + if (baseFileReader == null && logRecordScanner == null) { + return; + } - // local map to assist in merging with base file records - Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner, keys, timings); - List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = readFromBaseAndMergeWithLogRecords( - baseFileReader, keys, logRecords, timings, partitionName); - LOG.info(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", keys.size(), timings)); - return result; - } catch (IOException ioe) { - throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe); - } finally { - if (!reuse) { - close(partitionName); + // local map to assist in merging with base file records + Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = readLogRecords(logRecordScanner, + fileSliceKeys, timings); + result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, fileSliceKeys, logRecords, + timings, partitionName)); + LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", + fileSliceKeys.size(), timings)); + fileSlicesKeysCount.addAndGet(fileSliceKeys.size()); + } catch (IOException ioe) { + throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe); + } finally { + if (!reuse) { + close(Pair.of(partitionFileSlicePair.getLeft(), partitionFileSlicePair.getRight().getFileId())); + } } - } + }); + + ValidationUtils.checkState(keys.size() == fileSlicesKeysCount.get()); Review comment: Are there chances that if a key is deleted (lets say a file is deleted) and do get called with getRecordsByKey() with invalid/deleted key. wouldn't that return empt when we do readFromBaseAndMergeWithLogRecords. So, total incoming might differ from result list is it? Or do we atleast return Option.empty and so the total entries should match here for such cases? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java ########## @@ -233,38 +249,69 @@ private void initIfNeeded() { } /** - * Returns a new pair of readers to the base and log files. + * Get the file slice details for the given key in a partition. + * + * @param partitionName - Metadata partition name + * @param key - Key to get the file slice for + * @return Partition and file slice pair for the given key */ - private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String key, String partitionName) { - return partitionReaders.computeIfAbsent(partitionName, k -> { - try { - final long baseFileOpenMs; - final long logScannerOpenMs; - HoodieFileReader baseFileReader = null; - HoodieMetadataMergedLogRecordReader logRecordScanner = null; + private Pair<String, FileSlice> getPartitionFileSlice(final String partitionName, final String key) { + // Metadata is in sync till the latest completed instant on the dataset + List<FileSlice> latestFileSlices = + HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); + + final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, + latestFileSlices.size())); + return Pair.of(partitionName, slice); + } + + /** + * Get the latest file slices for the interested keys in a given partition. + * + * @param partitionName - Partition to get the file slices from + * @param keys - Interested keys + * @return FileSlices for the keys + */ + private Map<Pair<String, FileSlice>, List<String>> getPartitionFileSlices(final String partitionName, final List<String> keys) { Review comment: can we find a right name for this method. this is essentially mapping keys to file slices. but the return value is mapped the reverse way. but getPartitionFileSlices() does not convey that. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -79,14 +98,53 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont } } + /** + * Convert commit action to metadata records for the enabled partition types. + * + * @param commitMetadata - Commit action metadata + * @param dataMetaClient - Meta client for the data table + * @param isMetaIndexColumnStatsForAllColumns - Do all columns need meta indexing? + * @param instantTime - Action instant time + * @return Map of partition to metadata records for the commit action + */ + public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords( + HoodieEngineContext context, List<MetadataPartitionType> enabledPartitionTypes, + HoodieCommitMetadata commitMetadata, HoodieTableMetaClient dataMetaClient, + boolean isMetaIndexColumnStatsForAllColumns, String instantTime) { + final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); + final HoodieData<HoodieRecord> filesPartitionRecordsRDD = context.parallelize( + convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1); + partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); + + if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { + final List<HoodieRecord> metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(commitMetadata, + dataMetaClient, instantTime); + if (!metadataBloomFilterRecords.isEmpty()) { + final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD = context.parallelize(metadataBloomFilterRecords, 1); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); + } + } + + if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { + final List<HoodieRecord> metadataColumnStats = convertMetadataToColumnStatsRecords(commitMetadata, context, Review comment: same comment as above. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ########## @@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi */ public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) { Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); - partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); + partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); - HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath()); - HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo); + HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, + fileInfo); return new HoodieRecord<>(key, payload); } /** * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition. * - * @param partition The name of the partition - * @param filesAdded Mapping of files to their sizes for files which have been added to this partition + * @param partition The name of the partition + * @param filesAdded Mapping of files to their sizes for files which have been added to this partition * @param filesDeleted List of files which have been deleted from this partition */ public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition, - Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) { + Option<Map<String, Long>> filesAdded, + Option<List<String>> filesDeleted) { Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); filesAdded.ifPresent( m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false)))); filesDeleted.ifPresent( - m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); + m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); - HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath()); - HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo); + HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo); return new HoodieRecord<>(key, payload); } + /** + * Create bloom filter metadata record. + * + * @param partitionName - Partition name + * @param baseFileName - Base file name for which the bloom filter needs to persisted + * @param timestamp - Instant timestamp responsible for this record + * @param bloomFilter - Bloom filter for the File + * @param isDeleted - Is the bloom filter no more valid + * @return Metadata payload containing the fileID and its bloom filter record + */ + public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final String partitionName, + final String baseFileName, + final String timestamp, + final ByteBuffer bloomFilter, + final boolean isDeleted) { + ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR) + && FSUtils.isBaseFile(new Path(baseFileName)), + "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!"); + final String bloomFilterKey = new PartitionIndexID(partitionName).asBase64EncodedString() + .concat(new FileIndexID(baseFileName).asBase64EncodedString()); + HoodieKey key = new HoodieKey(bloomFilterKey, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()); + + // TODO: Get the bloom filter type from the file + HoodieMetadataBloomFilter metadataBloomFilter = + new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(), Review comment: if someone has explicitly set SIMPLE as bloom type for the data table, this might crash right. probably good to fix in this patch only. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java ########## @@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon .getAllFilesInPartitions(partitions); } + @Override + public Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName) + throws HoodieMetadataException { + if (!isBloomFilterIndexEnabled) { + LOG.error("Metadata bloom filter index is disabled!"); + return Option.empty(); + } + + final Pair<String, String> partitionFileName = Pair.of(partitionName, fileName); + Map<Pair<String, String>, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName)); + if (bloomFilters.isEmpty()) { + LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName); + return Option.empty(); + } + + ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName)); + return Option.of(bloomFilters.get(partitionFileName)); + } + + @Override + public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList) + throws HoodieMetadataException { + if (!isBloomFilterIndexEnabled) { + LOG.error("Metadata bloom filter index is disabled!"); + return Collections.emptyMap(); + } + if (partitionNameFileNameList.isEmpty()) { + return Collections.emptyMap(); + } + + HoodieTimer timer = new HoodieTimer().startTimer(); + Set<String> partitionIDFileIDSortedStrings = new TreeSet<>(); + Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>(); + partitionNameFileNameList.forEach(partitionNameFileNamePair -> { + final String bloomKey = new PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString() + .concat(new FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString()); + partitionIDFileIDSortedStrings.add(bloomKey); + fileToKeyMap.put(bloomKey, partitionNameFileNamePair); + } + ); + + List<String> partitionIDFileIDStrings = new ArrayList<>(partitionIDFileIDSortedStrings); + List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList = + getRecordsByKeys(partitionIDFileIDStrings, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, + (timer.endTimer() / partitionIDFileIDStrings.size()))); + + Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new HashMap<>(); + for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) { + if (entry.getRight().isPresent()) { + final Option<HoodieMetadataBloomFilter> bloomFilterMetadata = + entry.getRight().get().getData().getBloomFilterMetadata(); + if (bloomFilterMetadata.isPresent()) { + if (!bloomFilterMetadata.get().getIsDeleted()) { + ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft())); + partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), bloomFilterMetadata.get().getBloomFilter()); + } + } else { + LOG.error("Meta index bloom filter missing for: " + fileToKeyMap.get(entry.getLeft())); + } + } + } + return partitionFileToBloomFilterMap; + } + + @Override + public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final List<Pair<String, String>> partitionNameFileNameList, final String columnName) + throws HoodieMetadataException { + if (!isColumnStatsIndexEnabled) { + LOG.error("Metadata column stats index is disabled!"); + return Collections.emptyMap(); + } + + Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new HashMap<>(); + TreeSet<String> sortedKeys = new TreeSet<>(); + final String columnIndexStr = new ColumnIndexID(columnName).asBase64EncodedString(); + for (Pair<String, String> partitionNameFileNamePair : partitionNameFileNameList) { + final String columnStatIndexKey = columnIndexStr + .concat(new PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()) Review comment: same comment as above. I guess we have a static method in HoodieMetadataPayload to construct the key. can we use the same. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java ########## @@ -30,16 +28,21 @@ private final String columnName; private final T minValue; private final T maxValue; - private final long numNulls; - private final PrimitiveStringifier stringifier; + private final long nullCount; + private final long valueCount; + private final long totalSize; + private final long totalUncompressedSize; - public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) { + public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, Review comment: should we rename this class to HoodieColumnStatsMetadata or something. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java ########## @@ -233,38 +249,69 @@ private void initIfNeeded() { } /** - * Returns a new pair of readers to the base and log files. + * Get the file slice details for the given key in a partition. + * + * @param partitionName - Metadata partition name + * @param key - Key to get the file slice for + * @return Partition and file slice pair for the given key */ - private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String key, String partitionName) { - return partitionReaders.computeIfAbsent(partitionName, k -> { - try { - final long baseFileOpenMs; - final long logScannerOpenMs; - HoodieFileReader baseFileReader = null; - HoodieMetadataMergedLogRecordReader logRecordScanner = null; + private Pair<String, FileSlice> getPartitionFileSlice(final String partitionName, final String key) { + // Metadata is in sync till the latest completed instant on the dataset + List<FileSlice> latestFileSlices = + HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); + + final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, + latestFileSlices.size())); + return Pair.of(partitionName, slice); + } + + /** + * Get the latest file slices for the interested keys in a given partition. + * + * @param partitionName - Partition to get the file slices from + * @param keys - Interested keys + * @return FileSlices for the keys + */ + private Map<Pair<String, FileSlice>, List<String>> getPartitionFileSlices(final String partitionName, final List<String> keys) { + // Metadata is in sync till the latest completed instant on the dataset + List<FileSlice> latestFileSlices = Review comment: something to follow up after this patch may be. I see we call getPartitionLatestMergedFileSlices and getPartitionFileSlices in few places and could be repeated as well. Can we cache the return value based on latest instant. If latest instant has not changed, then latestFileSlice is not going to change right. So, might as well used the cached copy if we have one. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -79,14 +98,53 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont } } + /** + * Convert commit action to metadata records for the enabled partition types. + * + * @param commitMetadata - Commit action metadata + * @param dataMetaClient - Meta client for the data table + * @param isMetaIndexColumnStatsForAllColumns - Do all columns need meta indexing? + * @param instantTime - Action instant time + * @return Map of partition to metadata records for the commit action + */ + public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords( + HoodieEngineContext context, List<MetadataPartitionType> enabledPartitionTypes, + HoodieCommitMetadata commitMetadata, HoodieTableMetaClient dataMetaClient, + boolean isMetaIndexColumnStatsForAllColumns, String instantTime) { + final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); + final HoodieData<HoodieRecord> filesPartitionRecordsRDD = context.parallelize( + convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1); + partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); + + if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { + final List<HoodieRecord> metadataBloomFilterRecords = convertMetadataToBloomFilterRecords(commitMetadata, Review comment: if this is happening in the driver, can we try to parallelize across executors. reading bloom could add some latency if there are too many files. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ########## @@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi */ public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) { Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); - partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); + partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); - HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath()); - HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo); + HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, + fileInfo); return new HoodieRecord<>(key, payload); } /** * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition. * - * @param partition The name of the partition - * @param filesAdded Mapping of files to their sizes for files which have been added to this partition + * @param partition The name of the partition + * @param filesAdded Mapping of files to their sizes for files which have been added to this partition * @param filesDeleted List of files which have been deleted from this partition */ public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition, - Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) { + Option<Map<String, Long>> filesAdded, + Option<List<String>> filesDeleted) { Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); filesAdded.ifPresent( m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false)))); filesDeleted.ifPresent( - m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); + m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); - HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath()); - HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo); + HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo); return new HoodieRecord<>(key, payload); } + /** + * Create bloom filter metadata record. + * + * @param partitionName - Partition name + * @param baseFileName - Base file name for which the bloom filter needs to persisted + * @param timestamp - Instant timestamp responsible for this record + * @param bloomFilter - Bloom filter for the File + * @param isDeleted - Is the bloom filter no more valid + * @return Metadata payload containing the fileID and its bloom filter record + */ + public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final String partitionName, + final String baseFileName, + final String timestamp, + final ByteBuffer bloomFilter, + final boolean isDeleted) { + ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR) + && FSUtils.isBaseFile(new Path(baseFileName)), + "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!"); + final String bloomFilterKey = new PartitionIndexID(partitionName).asBase64EncodedString() Review comment: can we use the same name everywhere. may be bloomFilterIndexKey. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java ########## @@ -233,38 +249,69 @@ private void initIfNeeded() { } /** - * Returns a new pair of readers to the base and log files. + * Get the file slice details for the given key in a partition. + * + * @param partitionName - Metadata partition name + * @param key - Key to get the file slice for + * @return Partition and file slice pair for the given key */ - private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String key, String partitionName) { - return partitionReaders.computeIfAbsent(partitionName, k -> { - try { - final long baseFileOpenMs; - final long logScannerOpenMs; - HoodieFileReader baseFileReader = null; - HoodieMetadataMergedLogRecordReader logRecordScanner = null; + private Pair<String, FileSlice> getPartitionFileSlice(final String partitionName, final String key) { Review comment: I guess this method is not used anywhere. can you check. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -124,14 +182,111 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont return records; } + /** + * Convert commit action metadata to bloom filter records. + * + * @param commitMetadata - Commit action metadata + * @param dataMetaClient - Meta client for the data table + * @param instantTime - Action instant time + * @return List of metadata table records + */ + public static List<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata, + HoodieTableMetaClient dataMetaClient, + String instantTime) { + List<HoodieRecord> records = new LinkedList<>(); + commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { Review comment: if the commit metadata has only log files, we won't return anything from this method right. can you confirm that our writes still go through? or do we have any check that return list should match num files in commit metadata or atleast not empty ? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -319,9 +616,88 @@ private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTi return records; } + /** + * Convert rollback action metadata to bloom filter index records. + */ + private static List<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, + HoodieTableMetaClient dataMetaClient, + Map<String, List<String>> partitionToDeletedFiles, + Map<String, Map<String, Long>> partitionToAppendedFiles, + String instantTime) { + List<HoodieRecord> records = new LinkedList<>(); + partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> { + if (!FSUtils.isBaseFile(new Path(deletedFile))) { + return; + } + + final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; + records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord( + partition, deletedFile, instantTime, ByteBuffer.allocate(0), true)); + })); + + partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> { + final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; + appendedFileMap.forEach((appendedFile, length) -> { + if (!FSUtils.isBaseFile(new Path(appendedFile))) { + return; + } + final String pathWithPartition = partitionName + "/" + appendedFile; + final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), pathWithPartition); + try { + HoodieFileReader<IndexedRecord> fileReader = + HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), appendedFilePath); + final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); + if (fileBloomFilter == null) { + LOG.error("Failed to read bloom filter for " + appendedFilePath); + return; + } + ByteBuffer bloomByteBuffer = ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()); + HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( + partition, appendedFile, instantTime, bloomByteBuffer, false); + records.add(record); + fileReader.close(); + } catch (IOException e) { + LOG.error("Failed to get bloom filter for file: " + appendedFilePath); + } + }); + }); + return records; + } + + /** + * Convert rollback action metadata to column stats index records. + */ + private static List<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext, + HoodieTableMetaClient datasetMetaClient, + Map<String, List<String>> partitionToDeletedFiles, + Map<String, Map<String, Long>> partitionToAppendedFiles, + String instantTime) { + List<HoodieRecord> records = new LinkedList<>(); + List<String> latestColumns = getLatestColumns(datasetMetaClient); + partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> deletedFileList.forEach(deletedFile -> { Review comment: I did leave this comment for bloom filter as well. Since reading col stats could be latency sensitive if large no of files in a commit metadata, is there a possibility to parallelize the reads across diff files? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -399,4 +775,116 @@ public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGrou return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList()); } + public static List<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, + HoodieEngineContext engineContext, + HoodieTableMetaClient dataMetaClient, + boolean isMetaIndexColumnStatsForAllColumns, + String instantTime) { + + try { + List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(entry -> entry.stream()).collect(Collectors.toList()); + return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, dataMetaClient, allWriteStats, + isMetaIndexColumnStatsForAllColumns); + } catch (Exception e) { + throw new HoodieException("Failed to generate column stats records for metadata table ", e); + } + } + + /** + * Create column stats from write status. + * + * @param engineContext - Enging context + * @param datasetMetaClient - Dataset meta client + * @param allWriteStats - Write status to convert + * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for indexing + */ + public static List<HoodieRecord> createColumnStatsFromWriteStats(HoodieEngineContext engineContext, + HoodieTableMetaClient datasetMetaClient, + List<HoodieWriteStat> allWriteStats, + boolean isMetaIndexColumnStatsForAllColumns) throws Exception { + if (allWriteStats.isEmpty()) { + return Collections.emptyList(); + } + + List<HoodieWriteStat> prunedWriteStats = allWriteStats.stream().filter(writeStat -> { + return !(writeStat instanceof HoodieDeltaWriteStat); + }).collect(Collectors.toList()); + if (prunedWriteStats.isEmpty()) { + return Collections.emptyList(); + } + + return engineContext.flatMap(prunedWriteStats, + writeStat -> translateWriteStatToColumnStats(writeStat, datasetMetaClient, + getLatestColumns(datasetMetaClient, isMetaIndexColumnStatsForAllColumns)), + prunedWriteStats.size()); + } + + /** + * Get the latest columns for the table for column stats indexing. + * + * @param datasetMetaClient - Data table meta client + * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns + */ + private static List<String> getLatestColumns(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) { + if (!isMetaIndexColumnStatsForAllColumns + || datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() < 1) { + return Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp()); + } + + TableSchemaResolver schemaResolver = new TableSchemaResolver(datasetMetaClient); + // consider nested fields as well. if column stats is enabled only for a subset of columns, + // directly use them instead of all columns from the latest table schema + try { + return schemaResolver.getTableAvroSchema().getFields().stream() + .map(entry -> entry.name()).collect(Collectors.toList()); + } catch (Exception e) { + throw new HoodieException("Failed to get latest columns for " + datasetMetaClient.getBasePath()); + } + } + + private static List<String> getLatestColumns(HoodieTableMetaClient datasetMetaClient) { + return getLatestColumns(datasetMetaClient, false); + } + + public static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat, + HoodieTableMetaClient datasetMetaClient, + List<String> latestColumns) { + return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, latestColumns, false); + + } + + public static Stream<HoodieRecord> getColumnStats(final String partitionPath, final String filePathWithPartition, Review comment: can you please revisit access specifiers for new methods added. intellij might show yellow color against these. don't think these are required to be public. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -399,4 +775,116 @@ public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGrou return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList()); } + public static List<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, + HoodieEngineContext engineContext, + HoodieTableMetaClient dataMetaClient, + boolean isMetaIndexColumnStatsForAllColumns, + String instantTime) { + + try { + List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(entry -> entry.stream()).collect(Collectors.toList()); + return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, dataMetaClient, allWriteStats, + isMetaIndexColumnStatsForAllColumns); + } catch (Exception e) { + throw new HoodieException("Failed to generate column stats records for metadata table ", e); + } + } + + /** + * Create column stats from write status. + * + * @param engineContext - Enging context + * @param datasetMetaClient - Dataset meta client + * @param allWriteStats - Write status to convert + * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for indexing + */ + public static List<HoodieRecord> createColumnStatsFromWriteStats(HoodieEngineContext engineContext, + HoodieTableMetaClient datasetMetaClient, + List<HoodieWriteStat> allWriteStats, + boolean isMetaIndexColumnStatsForAllColumns) throws Exception { + if (allWriteStats.isEmpty()) { + return Collections.emptyList(); + } + + List<HoodieWriteStat> prunedWriteStats = allWriteStats.stream().filter(writeStat -> { + return !(writeStat instanceof HoodieDeltaWriteStat); + }).collect(Collectors.toList()); + if (prunedWriteStats.isEmpty()) { + return Collections.emptyList(); + } + + return engineContext.flatMap(prunedWriteStats, + writeStat -> translateWriteStatToColumnStats(writeStat, datasetMetaClient, + getLatestColumns(datasetMetaClient, isMetaIndexColumnStatsForAllColumns)), + prunedWriteStats.size()); + } + + /** + * Get the latest columns for the table for column stats indexing. + * + * @param datasetMetaClient - Data table meta client + * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns + */ + private static List<String> getLatestColumns(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) { + if (!isMetaIndexColumnStatsForAllColumns + || datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() < 1) { + return Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp()); + } + + TableSchemaResolver schemaResolver = new TableSchemaResolver(datasetMetaClient); + // consider nested fields as well. if column stats is enabled only for a subset of columns, Review comment: do we have a tracking jira for this. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java ########## @@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon .getAllFilesInPartitions(partitions); } + @Override + public Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName) + throws HoodieMetadataException { + if (!isBloomFilterIndexEnabled) { + LOG.error("Metadata bloom filter index is disabled!"); + return Option.empty(); + } + + final Pair<String, String> partitionFileName = Pair.of(partitionName, fileName); + Map<Pair<String, String>, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName)); + if (bloomFilters.isEmpty()) { + LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName); + return Option.empty(); + } + + ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName)); + return Option.of(bloomFilters.get(partitionFileName)); + } + + @Override + public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList) + throws HoodieMetadataException { + if (!isBloomFilterIndexEnabled) { + LOG.error("Metadata bloom filter index is disabled!"); + return Collections.emptyMap(); + } + if (partitionNameFileNameList.isEmpty()) { + return Collections.emptyMap(); + } + + HoodieTimer timer = new HoodieTimer().startTimer(); + Set<String> partitionIDFileIDSortedStrings = new TreeSet<>(); + Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>(); + partitionNameFileNameList.forEach(partitionNameFileNamePair -> { + final String bloomKey = new PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString() + .concat(new FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString()); + partitionIDFileIDSortedStrings.add(bloomKey); + fileToKeyMap.put(bloomKey, partitionNameFileNamePair); + } + ); + + List<String> partitionIDFileIDStrings = new ArrayList<>(partitionIDFileIDSortedStrings); + List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> hoodieRecordList = + getRecordsByKeys(partitionIDFileIDStrings, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR, + (timer.endTimer() / partitionIDFileIDStrings.size()))); + + Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new HashMap<>(); + for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : hoodieRecordList) { + if (entry.getRight().isPresent()) { + final Option<HoodieMetadataBloomFilter> bloomFilterMetadata = + entry.getRight().get().getData().getBloomFilterMetadata(); + if (bloomFilterMetadata.isPresent()) { + if (!bloomFilterMetadata.get().getIsDeleted()) { + ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft())); + partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), bloomFilterMetadata.get().getBloomFilter()); + } + } else { + LOG.error("Meta index bloom filter missing for: " + fileToKeyMap.get(entry.getLeft())); + } + } + } + return partitionFileToBloomFilterMap; + } + + @Override + public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final List<Pair<String, String>> partitionNameFileNameList, final String columnName) + throws HoodieMetadataException { + if (!isColumnStatsIndexEnabled) { + LOG.error("Metadata column stats index is disabled!"); + return Collections.emptyMap(); + } + + Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new HashMap<>(); + TreeSet<String> sortedKeys = new TreeSet<>(); + final String columnIndexStr = new ColumnIndexID(columnName).asBase64EncodedString(); + for (Pair<String, String> partitionNameFileNamePair : partitionNameFileNameList) { + final String columnStatIndexKey = columnIndexStr Review comment: guess we are using "Index" everywhere. so, can we add "s" to the name. (columnStat**s**IndexKey) ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java ########## @@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon .getAllFilesInPartitions(partitions); } + @Override + public Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName) + throws HoodieMetadataException { + if (!isBloomFilterIndexEnabled) { + LOG.error("Metadata bloom filter index is disabled!"); + return Option.empty(); + } + + final Pair<String, String> partitionFileName = Pair.of(partitionName, fileName); + Map<Pair<String, String>, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName)); + if (bloomFilters.isEmpty()) { + LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName); + return Option.empty(); + } + + ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName)); + return Option.of(bloomFilters.get(partitionFileName)); + } + + @Override + public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList) + throws HoodieMetadataException { + if (!isBloomFilterIndexEnabled) { + LOG.error("Metadata bloom filter index is disabled!"); + return Collections.emptyMap(); + } + if (partitionNameFileNameList.isEmpty()) { + return Collections.emptyMap(); + } + + HoodieTimer timer = new HoodieTimer().startTimer(); + Set<String> partitionIDFileIDSortedStrings = new TreeSet<>(); + Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>(); + partitionNameFileNameList.forEach(partitionNameFileNamePair -> { + final String bloomKey = new PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString() Review comment: can we use static method getBloomFilterIndexKey() in HoodieMetadataPayload to construct the key where ever needed. ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java ########## @@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFi */ public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) { Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); - partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); + partitions.forEach(partition -> fileInfo.put(partition, new HoodieMetadataFileInfo(0L, false))); - HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath()); - HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo); + HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, + fileInfo); return new HoodieRecord<>(key, payload); } /** * Create and return a {@code HoodieMetadataPayload} to save list of files within a partition. * - * @param partition The name of the partition - * @param filesAdded Mapping of files to their sizes for files which have been added to this partition + * @param partition The name of the partition + * @param filesAdded Mapping of files to their sizes for files which have been added to this partition * @param filesDeleted List of files which have been deleted from this partition */ public static HoodieRecord<HoodieMetadataPayload> createPartitionFilesRecord(String partition, - Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) { + Option<Map<String, Long>> filesAdded, + Option<List<String>> filesDeleted) { Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); filesAdded.ifPresent( m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false)))); filesDeleted.ifPresent( - m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); + m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); - HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.partitionPath()); - HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo); + HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.getPartitionPath()); + HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo); return new HoodieRecord<>(key, payload); } + /** + * Create bloom filter metadata record. + * + * @param partitionName - Partition name + * @param baseFileName - Base file name for which the bloom filter needs to persisted + * @param timestamp - Instant timestamp responsible for this record + * @param bloomFilter - Bloom filter for the File + * @param isDeleted - Is the bloom filter no more valid + * @return Metadata payload containing the fileID and its bloom filter record + */ + public static HoodieRecord<HoodieMetadataPayload> createBloomFilterMetadataRecord(final String partitionName, + final String baseFileName, + final String timestamp, + final ByteBuffer bloomFilter, + final boolean isDeleted) { + ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR) + && FSUtils.isBaseFile(new Path(baseFileName)), + "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!"); + final String bloomFilterKey = new PartitionIndexID(partitionName).asBase64EncodedString() + .concat(new FileIndexID(baseFileName).asBase64EncodedString()); + HoodieKey key = new HoodieKey(bloomFilterKey, MetadataPartitionType.BLOOM_FILTERS.getPartitionPath()); + + // TODO: Get the bloom filter type from the file + HoodieMetadataBloomFilter metadataBloomFilter = + new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(), + timestamp, bloomFilter, isDeleted); + HoodieMetadataPayload metadataPayload = new HoodieMetadataPayload(key.getRecordKey(), + HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter); + return new HoodieRecord<>(key, metadataPayload); + } + @Override public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { ValidationUtils.checkArgument(previousRecord.type == type, - "Cannot combine " + previousRecord.type + " with " + type); - - Map<String, HoodieMetadataFileInfo> combinedFileInfo = null; + "Cannot combine " + previousRecord.type + " with " + type); switch (type) { - case PARTITION_LIST: - case FILE_LIST: - combinedFileInfo = combineFilesystemMetadata(previousRecord); - break; + case METADATA_TYPE_PARTITION_LIST: + case METADATA_TYPE_FILE_LIST: + Map<String, HoodieMetadataFileInfo> combinedFileInfo = combineFilesystemMetadata(previousRecord); + return new HoodieMetadataPayload(key, type, combinedFileInfo); + case METADATA_TYPE_BLOOM_FILTER: + HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord); + return new HoodieMetadataPayload(key, type, combineBloomFilterMetadata); + case METADATA_TYPE_COLUMN_STATS: + return new HoodieMetadataPayload(key, type, combineColumnStatsMetadatat(previousRecord)); default: throw new HoodieMetadataException("Unknown type of HoodieMetadataPayload: " + type); } + } + + private HoodieMetadataBloomFilter combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) { + return this.bloomFilterMetadata; + } - return new HoodieMetadataPayload(key, type, combinedFileInfo); + private HoodieColumnStats combineColumnStatsMetadatat(HoodieMetadataPayload previousRecord) { + return this.columnStatMetadata; Review comment: actually my bad. shouldn't we be combining at a file level? for eg: metadata record1: file1: stats1_1 file2: stats2_1 metadata record2: file1: stats1_2 file3: stats3_1 what will be final value after combining both these records? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java ########## @@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon .getAllFilesInPartitions(partitions); } + @Override + public Option<ByteBuffer> getBloomFilter(final String partitionName, final String fileName) + throws HoodieMetadataException { + if (!isBloomFilterIndexEnabled) { + LOG.error("Metadata bloom filter index is disabled!"); + return Option.empty(); + } + + final Pair<String, String> partitionFileName = Pair.of(partitionName, fileName); + Map<Pair<String, String>, ByteBuffer> bloomFilters = getBloomFilters(Collections.singletonList(partitionFileName)); + if (bloomFilters.isEmpty()) { + LOG.error("Meta index: missing bloom filter for partition: " + partitionName + ", file: " + fileName); + return Option.empty(); + } + + ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName)); + return Option.of(bloomFilters.get(partitionFileName)); + } + + @Override + public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final List<Pair<String, String>> partitionNameFileNameList) + throws HoodieMetadataException { + if (!isBloomFilterIndexEnabled) { + LOG.error("Metadata bloom filter index is disabled!"); + return Collections.emptyMap(); + } + if (partitionNameFileNameList.isEmpty()) { + return Collections.emptyMap(); + } + + HoodieTimer timer = new HoodieTimer().startTimer(); + Set<String> partitionIDFileIDSortedStrings = new TreeSet<>(); + Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>(); + partitionNameFileNameList.forEach(partitionNameFileNamePair -> { + final String bloomKey = new PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString() Review comment: or bloomFilterIndexKey -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org