This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 7c12decc86c [HUDI-6762] Removed usages of MetadataRecordsGenerationParams (#10962) 7c12decc86c is described below commit 7c12decc86ccfbdc8bd06fe64d3e1c507cbbfbf6 Author: Vova Kolmakov <wombatu...@gmail.com> AuthorDate: Tue Apr 16 00:05:57 2024 +0700 [HUDI-6762] Removed usages of MetadataRecordsGenerationParams (#10962) Co-authored-by: Vova Kolmakov <kolmakov.vladi...@huawei-partners.com> --- .../metadata/HoodieBackedTableMetadataWriter.java | 158 ++++++------ .../hudi/metadata/HoodieTableMetadataUtil.java | 281 ++++++++++++--------- .../metadata/MetadataRecordsGenerationParams.java | 89 ------- 3 files changed, 234 insertions(+), 294 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 891cc88b9da..dea317e60b7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -262,7 +262,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. if (!dataWriteConfig.isMetadataAsyncIndex()) { Set<String> completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions(); - LOG.info("Async metadata indexing disabled and following partitions already initialized: " + completedPartitions); + LOG.info("Async metadata indexing disabled and following partitions already initialized: {}", completedPartitions); // TODO: fix the filter to check for exact partition name, e.g. completedPartitions could have func_index_datestr, // but now the user is trying to initialize the func_index_dayhour partition. this.enabledPartitionTypes.stream() @@ -345,12 +345,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM LOG.warn("Metadata Table will need to be re-initialized as no instants were found"); return true; } - - final String latestMetadataInstantTimestamp = latestMetadataInstant.get().getTimestamp(); - if (latestMetadataInstantTimestamp.startsWith(SOLO_COMMIT_TIMESTAMP)) { // the initialization timestamp is SOLO_COMMIT_TIMESTAMP + offset - return false; - } - return false; } @@ -411,8 +405,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM for (MetadataPartitionType partitionType : partitionsToInit) { // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); - - LOG.info("Initializing MDT partition " + partitionType.name() + " at instant " + commitTimeForPartition); + String partitionTypeName = partitionType.name(); + LOG.info("Initializing MDT partition {} at instant {}", partitionTypeName, commitTimeForPartition); Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; try { @@ -438,37 +432,41 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM fileGroupCountAndRecordsPair = initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next()); break; default: - throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + throw new HoodieMetadataException(String.format("Unsupported MDT partition type: %s", partitionType)); } } catch (Exception e) { String metricKey = partitionType.getPartitionPath() + "_" + HoodieMetadataMetrics.BOOTSTRAP_ERR_STR; metrics.ifPresent(m -> m.setMetric(metricKey, 1)); - LOG.error("Bootstrap on " + partitionType.getPartitionPath() + " partition failed for " - + metadataMetaClient.getBasePath(), e); - throw new HoodieMetadataException(partitionType.getPartitionPath() - + " bootstrap failed for " + metadataMetaClient.getBasePath(), e); + String errMsg = String.format("Bootstrap on %s partition failed for %s", + partitionType.getPartitionPath(), metadataMetaClient.getBasePathV2()); + LOG.error(errMsg, e); + throw new HoodieMetadataException(errMsg, e); } - LOG.info(String.format("Initializing %s index with %d mappings and %d file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(), - fileGroupCountAndRecordsPair.getValue().count())); + if (LOG.isInfoEnabled()) { + LOG.info("Initializing {} index with {} mappings and {} file groups.", partitionTypeName, fileGroupCountAndRecordsPair.getKey(), + fileGroupCountAndRecordsPair.getValue().count()); + } HoodieTimer partitionInitTimer = HoodieTimer.start(); // Generate the file groups final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); - ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType.name() + " should be > 0"); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionTypeName + " should be > 0"); initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); // Perform the commit using bulkCommit HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue(); bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount); metadataMetaClient.reloadActiveTimeline(); - String partitionPath = (partitionType == FUNCTIONAL_INDEX) ? dataWriteConfig.getFunctionalIndexConfig().getIndexName() : partitionType.getPartitionPath(); + String partitionPath = partitionType == FUNCTIONAL_INDEX + ? dataWriteConfig.getFunctionalIndexConfig().getIndexName() + : partitionType.getPartitionPath(); dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, true); // initialize the metadata reader again so the MDT partition can be read after initialization initMetadataReader(); long totalInitTime = partitionInitTimer.endTimer(); - LOG.info(String.format("Initializing %s index in metadata table took " + totalInitTime + " in ms", partitionType.name())); + LOG.info("Initializing {} index in metadata table took {} in ms", partitionTypeName, totalInitTime); } return true; @@ -502,7 +500,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) { HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); + engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient, dataWriteConfig.isMetadataColumnStatsIndexEnabled(), + dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex()); final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); return Pair.of(fileGroupCount, records); @@ -510,7 +509,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) { HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); + engineContext, Collections.emptyMap(), partitionToFilesMap, createInstantTime, dataMetaClient, + dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.getBloomFilterType()); final int fileGroupCount = dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount(); return Pair.of(fileGroupCount, records); @@ -568,8 +568,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM .map(basefile -> Pair.of(partition, basefile)).collect(Collectors.toList())); } - LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in " - + partitions.size() + " partitions"); + LOG.info("Initializing record index from {} base files in {} partitions", partitionBaseFilePairs.size(), partitions.size()); // Collect record keys from the files in parallel HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles( @@ -590,7 +589,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM dataWriteConfig.getRecordIndexMaxFileGroupCount(), dataWriteConfig.getRecordIndexGrowthFactor(), dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes()); - LOG.info(String.format("Initializing record index with %d mappings and %d file groups.", recordCount, fileGroupCount)); + LOG.info("Initializing record index with {} mappings and {} file groups.", recordCount, fileGroupCount); return Pair.of(fileGroupCount, records); } @@ -632,8 +631,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM if (!pendingDataInstant.isEmpty()) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); - LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: " - + Arrays.toString(pendingDataInstant.toArray())); + LOG.warn("Cannot initialize metadata table as operation(s) are in progress on the dataset: {}", + Arrays.toString(pendingDataInstant.toArray())); return true; } return false; @@ -672,7 +671,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism(); SerializableConfiguration conf = new SerializableConfiguration(dataMetaClient.getHadoopConf()); final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex(); - final String datasetBasePath = dataMetaClient.getBasePath(); + final String datasetBasePath = dataMetaClient.getBasePathV2().toString(); SerializablePath serializableBasePath = new SerializablePath(new CachingPath(datasetBasePath)); while (!pathsToList.isEmpty()) { @@ -694,7 +693,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM if (!dirFilterRegex.isEmpty()) { final String relativePath = dirInfo.getRelativePath(); if (!relativePath.isEmpty() && relativePath.matches(dirFilterRegex)) { - LOG.info("Ignoring directory " + relativePath + " which matches the filter regex " + dirFilterRegex); + LOG.info("Ignoring directory {} which matches the filter regex {}", relativePath, dirFilterRegex); continue; } } @@ -750,7 +749,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM try { final FileStatus[] existingFiles = fs.listStatus(partitionPath); if (existingFiles.length > 0) { - LOG.warn("Deleting all existing files found in MDT partition " + partitionName); + LOG.warn("Deleting all existing files found in MDT partition {}", partitionName); fs.delete(partitionPath, true); ValidationUtils.checkState(!fs.exists(partitionPath), "Failed to delete MDT partition " + partitionName); } @@ -795,7 +794,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM writer.appendBlock(block); writer.close(); } catch (InterruptedException e) { - throw new HoodieException("Failed to created fileGroup " + fileGroupFileId + " for partition " + partitionName, e); + throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, partitionName), e); } }, fileGroupFileIds.size()); } @@ -805,10 +804,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM String partitionPath = partitionType.getPartitionPath(); // first update table config dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false); - LOG.warn("Deleting Metadata Table partition: " + partitionPath); + LOG.warn("Deleting Metadata Table partition: {}", partitionPath); dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true); // delete corresponding pending indexing instant file in the timeline - LOG.warn("Deleting pending indexing instant from the timeline for partition: " + partitionPath); + LOG.warn("Deleting pending indexing instant from the timeline for partition: {}", partitionPath); deletePendingIndexingInstant(dataMetaClient, partitionPath); } closeInternal(); @@ -830,7 +829,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM metaClient.getActiveTimeline().deleteInstantFileIfExists(getIndexInflightInstant(instant.getTimestamp())); } } catch (IOException e) { - LOG.error("Failed to delete the instant file corresponding to " + instant); + LOG.error("Failed to delete the instant file corresponding to {}", instant); } }); } @@ -850,18 +849,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM } } - private MetadataRecordsGenerationParams getRecordsGenerationParams() { - return new MetadataRecordsGenerationParams( - dataMetaClient, - enabledPartitionTypes, - dataWriteConfig.getBloomFilterType(), - dataWriteConfig.getMetadataBloomFilterIndexParallelism(), - dataWriteConfig.isMetadataColumnStatsIndexEnabled(), - dataWriteConfig.getColumnStatsIndexParallelism(), - dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), - dataWriteConfig.getColumnsEnabledForBloomFilterIndex()); - } - /** * Interface to assist in converting commit metadata to List of HoodieRecords to be written to metadata table. * Updates of different commit metadata uses the same method to convert to HoodieRecords and hence. @@ -913,12 +900,13 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM List<MetadataPartitionType> partitionTypes = new ArrayList<>(); indexPartitionInfos.forEach(indexPartitionInfo -> { String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath(); - LOG.info(String.format("Creating a new metadata index for partition '%s' under path %s upto instant %s", - relativePartitionPath, metadataWriteConfig.getBasePath(), indexUptoInstantTime)); + LOG.info("Creating a new metadata index for partition '{}' under path {} upto instant {}", + relativePartitionPath, metadataWriteConfig.getBasePath(), indexUptoInstantTime); // return early and populate enabledPartitionTypes correctly (check in initialCommit) - MetadataPartitionType partitionType = relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX) ? FUNCTIONAL_INDEX : - MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT)); + MetadataPartitionType partitionType = relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX) + ? FUNCTIONAL_INDEX + : MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT)); if (!enabledPartitionTypes.contains(partitionType)) { throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", partitionType)); } @@ -944,7 +932,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM processAndCommit(instantTime, () -> { Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords( - engineContext, dataWriteConfig, commitMetadata, instantTime, getRecordsGenerationParams()); + engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, + enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), + dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(), + dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex()); // Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code // to the HoodieTableMetadataUtil class in hudi-common. @@ -962,7 +953,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM processAndCommit(instantTime, () -> { Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords( - engineContext, dataWriteConfig, commitMetadata, instantTime, getRecordsGenerationParams()); + engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient, + enabledPartitionTypes, dataWriteConfig.getBloomFilterType(), + dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(), + dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex()); HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata); partitionToRecordMap.put(RECORD_INDEX, records.union(additionalUpdates)); updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); @@ -983,7 +977,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM try { functionalIndexRecords = getFunctionalIndexUpdates(commitMetadata, partition, instantTime); } catch (Exception e) { - throw new HoodieMetadataException("Failed to get functional index updates for partition " + partition, e); + throw new HoodieMetadataException(String.format("Failed to get functional index updates for partition %s", partition), e); } partitionToRecordMap.put(FUNCTIONAL_INDEX, functionalIndexRecords); }); @@ -1025,7 +1019,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM @Override public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, - cleanMetadata, getRecordsGenerationParams(), instantTime)); + cleanMetadata, instantTime, dataMetaClient, enabledPartitionTypes, + dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(), + dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex())); closeInternal(); } @@ -1040,22 +1036,22 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM dataMetaClient.reloadActiveTimeline(); // Fetch the commit to restore to (savepointed commit time) - HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime); + HoodieInstant restoreInstant = new HoodieInstant(REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime); HoodieInstant requested = HoodieTimeline.getRestoreRequestedInstant(restoreInstant); HoodieRestorePlan restorePlan = null; try { restorePlan = TimelineMetadataUtils.deserializeAvroMetadata( dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class); } catch (IOException e) { - throw new HoodieIOException("Deserialization of restore plan failed whose restore instant time is " + instantTime + " in data table", e); + throw new HoodieIOException(String.format("Deserialization of restore plan failed whose restore instant time is %s in data table", instantTime), e); } final String restoreToInstantTime = restorePlan.getSavepointToRestoreTimestamp(); - LOG.info("Triggering restore to " + restoreToInstantTime + " in metadata table"); + LOG.info("Triggering restore to {} in metadata table", restoreToInstantTime); // fetch the earliest commit to retain and ensure the base file prior to the time to restore is present List<HoodieFileGroup> filesGroups = metadata.getMetadataFileSystemView().getAllFileGroups(FILES.getPartitionPath()).collect(Collectors.toList()); - boolean cannotRestore = filesGroups.stream().map(fileGroup -> fileGroup.getAllFileSlices().map(fileSlice -> fileSlice.getBaseInstantTime()).anyMatch( + boolean cannotRestore = filesGroups.stream().map(fileGroup -> fileGroup.getAllFileSlices().map(FileSlice::getBaseInstantTime).anyMatch( instantTime1 -> HoodieTimeline.compareTimestamps(instantTime1, LESSER_THAN_OR_EQUALS, restoreToInstantTime))).anyMatch(canRestore -> !canRestore); if (cannotRestore) { throw new HoodieMetadataException(String.format("Can't restore to %s since there is no base file in MDT lesser than the commit to restore to. " @@ -1117,13 +1113,13 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // The deltacommit that will be rolled back HoodieInstant deltaCommitInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime); if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) { - LOG.info("Rolling back MDT deltacommit " + commitToRollbackInstantTime); + LOG.info("Rolling back MDT deltacommit {}", commitToRollbackInstantTime); if (!getWriteClient().rollback(commitToRollbackInstantTime, instantTime)) { - throw new HoodieMetadataException("Failed to rollback deltacommit at " + commitToRollbackInstantTime); + throw new HoodieMetadataException(String.format("Failed to rollback deltacommit at %s", commitToRollbackInstantTime)); } } else { - LOG.info(String.format("Ignoring rollback of instant %s at %s. The commit to rollback is not found in MDT", - commitToRollbackInstantTime, instantTime)); + LOG.info("Ignoring rollback of instant {} at {}. The commit to rollback is not found in MDT", + commitToRollbackInstantTime, instantTime); } closeInternal(); } @@ -1136,12 +1132,13 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // The commit being rolled back should not be earlier than the latest compaction on the MDT because the latest file slice does not change after all. // Compaction on MDT only occurs when all actions are completed on the dataset. // Hence, this case implies a rollback of completed commit which should actually be handled using restore. - if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { + if (compactionInstant.getAction().equals(COMMIT_ACTION)) { final String compactionInstantTime = compactionInstant.getTimestamp(); - if (commitToRollbackInstantTime.length() == compactionInstantTime.length() && HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime)) { - throw new HoodieMetadataException(String.format("Commit being rolled back %s is earlier than the latest compaction %s. " - + "There are %d deltacommits after this compaction: %s", commitToRollbackInstantTime, compactionInstantTime, - deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants())); + if (commitToRollbackInstantTime.length() == compactionInstantTime.length() && LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime)) { + throw new HoodieMetadataException( + String.format("Commit being rolled back %s is earlier than the latest compaction %s. There are %d deltacommits after this compaction: %s", + commitToRollbackInstantTime, compactionInstantTime, deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants()) + ); } } } @@ -1187,7 +1184,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM if (!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) { // if this is a new commit being applied to metadata for the first time - LOG.info("New commit at " + instantTime + " being applied to MDT."); + LOG.info("New commit at {} being applied to MDT.", instantTime); } else { // this code path refers to a re-attempted commit that: // 1. got committed to metadata table, but failed in datatable. @@ -1199,12 +1196,12 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // already part of completed commit. So, we have to manually rollback the completed instant and proceed. Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)) .lastInstant(); - LOG.info(String.format("%s completed commit at %s being applied to MDT.", - alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime)); + LOG.info("{} completed commit at {} being applied to MDT.", + alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime); // Rollback the previous commit if (!writeClient.rollback(instantTime)) { - throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT"); + throw new HoodieMetadataException(String.format("Failed to rollback deltacommit at %s from MDT", instantTime)); } metadataMetaClient.reloadActiveTimeline(); } @@ -1271,7 +1268,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM fileSlices = getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient, Option.ofNullable(fsView), partitionName); } final int fileGroupCount = fileSlices.size(); - ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionName + " should be >0"); + ValidationUtils.checkArgument(fileGroupCount > 0, String.format("FileGroup count for MDT partition %s should be >0", partitionName)); List<FileSlice> finalFileSlices = fileSlices; HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> { @@ -1319,7 +1316,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // Do timeline validation before scheduling compaction/logCompaction operations. if (validateCompactionScheduling()) { String latestDeltacommitTime = lastInstant.get().getTimestamp(); - LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + ", running compaction operations."); + LOG.info("Latest deltacommit time found is {}, running compaction operations.", latestDeltacommitTime); compactIfNecessary(writeClient, latestDeltacommitTime); } writeClient.archive(); @@ -1368,17 +1365,17 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // and again w/ C6, we will re-attempt compaction at which point latest delta commit is C4 in MDT. // and so we try compaction w/ instant C4001. So, we can avoid compaction if we already have compaction w/ same instant time. if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) { - LOG.info(String.format("Compaction with same %s time is already present in the timeline.", compactionInstantTime)); + LOG.info("Compaction with same {} time is already present in the timeline.", compactionInstantTime); } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { - LOG.info("Compaction is scheduled for timestamp " + compactionInstantTime); + LOG.info("Compaction is scheduled for timestamp {}", compactionInstantTime); writeClient.compact(compactionInstantTime); } else if (metadataWriteConfig.isLogCompactionEnabled()) { // Schedule and execute log compaction with new instant time. final String logCompactionInstantTime = metadataMetaClient.createNewInstantTime(false); if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime)) { - LOG.info(String.format("Log compaction with same %s time is already present in the timeline.", logCompactionInstantTime)); + LOG.info("Log compaction with same {} time is already present in the timeline.", logCompactionInstantTime); } else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, Option.empty())) { - LOG.info("Log compaction is scheduled for timestamp " + logCompactionInstantTime); + LOG.info("Log compaction is scheduled for timestamp {}", logCompactionInstantTime); writeClient.logCompact(logCompactionInstantTime); } } @@ -1417,8 +1414,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM Option<HoodieInstant> pendingCompactionInstant = metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) { - LOG.warn(String.format("Not scheduling compaction or logCompaction, since a pending compaction instant %s or logCompaction %s instant is present", - pendingCompactionInstant, pendingLogCompactionInstant)); + LOG.warn("Not scheduling compaction or logCompaction, since a pending compaction instant {} or logCompaction {} instant is present", + pendingCompactionInstant, pendingLogCompactionInstant); return false; } } @@ -1487,8 +1484,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // newLocation should have the same fileID as currentLocation. The instantTimes differ as newLocation's // instantTime refers to the current commit which was completed. if (!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId())) { - final String msg = String.format("Detected update in location of record with key %s from %s " - + " to %s. The fileID should not change.", + final String msg = String.format("Detected update in location of record with key %s from %s to %s. The fileID should not change.", recordDelegate, recordDelegate.getCurrentLocation().get(), newLocation.get()); LOG.error(msg); throw new HoodieMetadataException(msg); @@ -1617,7 +1613,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM // Regular HUDI data file (base file or log file) String dataFileCommitTime = FSUtils.getCommitTime(status.getPath().getName()); // Limit the file listings to files which were created before the maxInstant time. - if (HoodieTimeline.compareTimestamps(dataFileCommitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)) { + if (HoodieTimeline.compareTimestamps(dataFileCommitTime, LESSER_THAN_OR_EQUALS, maxInstantTime)) { filenameToSizeMap.put(status.getPath().getName(), status.getLen()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 8852b52a1be..d8270ed17d4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -80,6 +80,7 @@ import org.apache.hudi.common.util.collection.Tuple3; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -153,6 +154,9 @@ public class HoodieTableMetadataUtil { public static final String PARTITION_NAME_RECORD_INDEX = "record_index"; public static final String PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX = "func_index_"; + private HoodieTableMetadataUtil() { + } + public static final Set<Class<?>> COLUMN_STATS_RECORD_SUPPORTED_TYPES = new HashSet<>(Arrays.asList( IntWrapper.class, BooleanWrapper.class, DateWrapper.class, DoubleWrapper.class, FloatWrapper.class, LongWrapper.class, @@ -162,7 +166,7 @@ public class HoodieTableMetadataUtil { // are reserved for future operations on the MDT. private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // corresponds to "010"; // we have max of 4 partitions (FILES, COL_STATS, BLOOM, RLI) - private static final List<String> VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = Arrays.asList("010","011","012","013"); + private static final List<String> VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = Arrays.asList("010", "011", "012", "013"); /** * Returns whether the files partition of metadata table is ready for read. @@ -205,7 +209,7 @@ public class HoodieTableMetadataUtil { // For each column (field) we have to index update corresponding column stats // with the values from this record targetFields.forEach(field -> { - ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), (ignored) -> new ColumnStats()); + ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), ignored -> new ColumnStats()); Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(recordSchema, field.name()); Object fieldValue; @@ -235,7 +239,7 @@ public class HoodieTableMetadataUtil { }); Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, HoodieColumnRangeMetadata<Comparable>>> collector = - Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity()); + Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity()); return (Map<String, HoodieColumnRangeMetadata<Comparable>>) targetFields.stream() .map(field -> { @@ -274,14 +278,14 @@ public class HoodieTableMetadataUtil { public static Option<String> getColumnStatsValueAsString(Object statsValue) { if (statsValue == null) { - LOG.info("Invalid column stats value: " + statsValue); + LOG.info("Invalid column stats value: {}", statsValue); return Option.empty(); } Class<?> statsValueClass = statsValue.getClass(); if (COLUMN_STATS_RECORD_SUPPORTED_TYPES.contains(statsValueClass)) { return Option.of(String.valueOf(((IndexedRecord) statsValue).get(0))); } else { - throw new RuntimeException("Unsupported type: " + statsValueClass.getSimpleName()); + throw new HoodieNotSupportedException("Unsupported type: " + statsValueClass.getSimpleName()); } } @@ -329,28 +333,44 @@ public class HoodieTableMetadataUtil { /** * Convert commit action to metadata records for the enabled partition types. * - * @param commitMetadata - Commit action metadata - * @param hoodieConfig - Hudi configs - * @param instantTime - Action instant time - * @param recordsGenerationParams - Parameters for the record generation + * @param context - Engine context to use + * @param hoodieConfig - Hudi configs + * @param commitMetadata - Commit action metadata + * @param instantTime - Action instant time + * @param dataMetaClient - HoodieTableMetaClient for data + * @param enabledPartitionTypes - List of enabled MDT partitions + * @param bloomFilterType - Type of generated bloom filter records + * @param bloomIndexParallelism - Parallelism for bloom filter record generation + * @param isColumnStatsIndexEnabled - Is column stats index enabled + * @param columnStatsIndexParallelism - Parallelism for column stats index records generation + * @param targetColumnsForColumnStatsIndex - List of columns for column stats index * @return Map of partition to metadata records for the commit action */ - public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords( - HoodieEngineContext context, HoodieConfig hoodieConfig, HoodieCommitMetadata commitMetadata, - String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) { + public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext context, + HoodieConfig hoodieConfig, + HoodieCommitMetadata commitMetadata, + String instantTime, + HoodieTableMetaClient dataMetaClient, + List<MetadataPartitionType> enabledPartitionTypes, + String bloomFilterType, + int bloomIndexParallelism, + boolean isColumnStatsIndexEnabled, + int columnStatsIndexParallelism, + List<String> targetColumnsForColumnStatsIndex) { final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); final HoodieData<HoodieRecord> filesPartitionRecordsRDD = context.parallelize( convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); - if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { final HoodieData<HoodieRecord> metadataBloomFilterRecords = convertMetadataToBloomFilterRecords( - context, hoodieConfig, commitMetadata, instantTime, recordsGenerationParams); + context, hoodieConfig, commitMetadata, instantTime, dataMetaClient, bloomFilterType, bloomIndexParallelism); partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecords); } - if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { - final HoodieData<HoodieRecord> metadataColumnStatsRDD = convertMetadataToColumnStatsRecords(commitMetadata, context, recordsGenerationParams); + if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { + final HoodieData<HoodieRecord> metadataColumnStatsRDD = convertMetadataToColumnStatsRecords(commitMetadata, context, + dataMetaClient, isColumnStatsIndexEnabled, columnStatsIndexParallelism, targetColumnsForColumnStatsIndex); partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } return partitionToRecordsMap; @@ -387,7 +407,7 @@ public class HoodieTableMetadataUtil { String pathWithPartition = stat.getPath(); if (pathWithPartition == null) { // Empty partition - LOG.warn("Unable to find path in write stat to update metadata table " + stat); + LOG.warn("Unable to find path in write stat to update metadata table {}", stat); return map; } @@ -401,9 +421,7 @@ public class HoodieTableMetadataUtil { Map<String, Long> cdcPathAndSizes = stat.getCdcStats(); if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) { - cdcPathAndSizes.entrySet().forEach(cdcEntry -> { - map.put(FSUtils.getFileName(cdcEntry.getKey(), partitionStatName), cdcEntry.getValue()); - }); + cdcPathAndSizes.forEach((key, value) -> map.put(FSUtils.getFileName(key, partitionStatName), value)); } return map; }, @@ -417,8 +435,8 @@ public class HoodieTableMetadataUtil { records.addAll(updatedPartitionFilesRecords); - LOG.info(String.format("Updating at %s from Commit/%s. #partitions_updated=%d, #files_added=%d", instantTime, commitMetadata.getOperationType(), - records.size(), newFileCount.value())); + LOG.info("Updating at {} from Commit/{}. #partitions_updated={}, #files_added={}", instantTime, commitMetadata.getOperationType(), + records.size(), newFileCount.value()); return records; } @@ -447,21 +465,28 @@ public class HoodieTableMetadataUtil { * Convert commit action metadata to bloom filter records. * * @param context - Engine context to use + * @param hoodieConfig - Hudi configs * @param commitMetadata - Commit action metadata * @param instantTime - Action instant time - * @param recordsGenerationParams - Parameters for bloom filter record generation + * @param dataMetaClient - HoodieTableMetaClient for data + * @param bloomFilterType - Type of generated bloom filter records + * @param bloomIndexParallelism - Parallelism for bloom filter record generation * @return HoodieData of metadata table records */ - public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords( - HoodieEngineContext context, HoodieConfig hoodieConfig, HoodieCommitMetadata commitMetadata, - String instantTime, MetadataRecordsGenerationParams recordsGenerationParams) { + public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieEngineContext context, + HoodieConfig hoodieConfig, + HoodieCommitMetadata commitMetadata, + String instantTime, + HoodieTableMetaClient dataMetaClient, + String bloomFilterType, + int bloomIndexParallelism) { final List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(entry -> entry.stream()).collect(Collectors.toList()); + .flatMap(Collection::stream).collect(Collectors.toList()); if (allWriteStats.isEmpty()) { return context.emptyHoodieData(); } - final int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); + final int parallelism = Math.max(Math.min(allWriteStats.size(), bloomIndexParallelism), 1); HoodieData<HoodieWriteStat> allWriteStatsRDD = context.parallelize(allWriteStats, parallelism); return allWriteStatsRDD.flatMap(hoodieWriteStat -> { final String partition = hoodieWriteStat.getPartitionPath(); @@ -474,7 +499,7 @@ public class HoodieTableMetadataUtil { String pathWithPartition = hoodieWriteStat.getPath(); if (pathWithPartition == null) { // Empty partition - LOG.error("Failed to find path in write stat to update metadata table " + hoodieWriteStat); + LOG.error("Failed to find path in write stat to update metadata table {}", hoodieWriteStat); return Collections.emptyListIterator(); } @@ -483,28 +508,26 @@ public class HoodieTableMetadataUtil { return Collections.emptyListIterator(); } - final Path writeFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition); + final Path writeFilePath = new Path(dataMetaClient.getBasePathV2(), pathWithPartition); try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader( - hoodieConfig, recordsGenerationParams.getDataMetaClient().getHadoopConf(), writeFilePath)) { + hoodieConfig, dataMetaClient.getHadoopConf(), writeFilePath)) { try { final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); if (fileBloomFilter == null) { - LOG.error("Failed to read bloom filter for " + writeFilePath); + LOG.error("Failed to read bloom filter for {}", writeFilePath); return Collections.emptyListIterator(); } ByteBuffer bloomByteBuffer = ByteBuffer.wrap(getUTF8Bytes(fileBloomFilter.serializeToString())); HoodieRecord record = HoodieMetadataPayload.createBloomFilterMetadataRecord( - partition, fileName, instantTime, recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false); + partition, fileName, instantTime, bloomFilterType, bloomByteBuffer, false); return Collections.singletonList(record).iterator(); } catch (Exception e) { - LOG.error("Failed to read bloom filter for " + writeFilePath); + LOG.error("Failed to read bloom filter for {}", writeFilePath); return Collections.emptyListIterator(); - } finally { - fileReader.close(); } } catch (IOException e) { - LOG.error("Failed to get bloom filter for file: " + writeFilePath + ", write stat: " + hoodieWriteStat); + LOG.error("Failed to get bloom filter for file: {}, write stat: {}", writeFilePath, hoodieWriteStat); } return Collections.emptyListIterator(); }); @@ -515,22 +538,28 @@ public class HoodieTableMetadataUtil { */ public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadataToRecords(HoodieEngineContext engineContext, HoodieCleanMetadata cleanMetadata, - MetadataRecordsGenerationParams recordsGenerationParams, - String instantTime) { + String instantTime, + HoodieTableMetaClient dataMetaClient, + List<MetadataPartitionType> enabledPartitionTypes, + int bloomIndexParallelism, + boolean isColumnStatsIndexEnabled, + int columnStatsIndexParallelism, + List<String> targetColumnsForColumnStatsIndex) { final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); final HoodieData<HoodieRecord> filesPartitionRecordsRDD = engineContext.parallelize( convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecordsRDD); - if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS)) { + if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) { final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD = - convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, recordsGenerationParams); + convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, bloomIndexParallelism); partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, metadataBloomFilterRecordsRDD); } - if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) { + if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) { final HoodieData<HoodieRecord> metadataColumnStatsRDD = - convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, recordsGenerationParams); + convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, + dataMetaClient, isColumnStatsIndexEnabled, columnStatsIndexParallelism, targetColumnsForColumnStatsIndex); partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD); } @@ -566,8 +595,8 @@ public class HoodieTableMetadataUtil { // if there are partitions to be deleted, add them to delete list records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true)); } - LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size()); + LOG.info("Updating at {} from Clean. #partitions_updated={}, #files_deleted={}, #partitions_deleted={}", + instantTime, records.size(), fileDeleteCount[0], deletedPartitions.size()); return records; } @@ -600,8 +629,8 @@ public class HoodieTableMetadataUtil { records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true)); } - LOG.info("Re-adding missing records at " + instantTime + " during Restore. #partitions_updated=" + records.size() - + ", #files_added=" + filesAddedCount[0] + ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size()); + LOG.info("Re-adding missing records at {} during Restore. #partitions_updated={}, #files_added={}, #files_deleted={}, #partitions_deleted={}", + instantTime, records.size(), filesAddedCount[0], fileDeleteCount[0], deletedPartitions.size()); return Collections.singletonMap(MetadataPartitionType.FILES, engineContext.parallelize(records, 1)); } @@ -611,13 +640,13 @@ public class HoodieTableMetadataUtil { * @param cleanMetadata - Clean action metadata * @param engineContext - Engine context * @param instantTime - Clean action instant time - * @param recordsGenerationParams - Parameters for bloom filter record generation + * @param bloomIndexParallelism - Parallelism for bloom filter record generation * @return List of bloom filter index records for the clean metadata */ public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata, HoodieEngineContext engineContext, String instantTime, - MetadataRecordsGenerationParams recordsGenerationParams) { + int bloomIndexParallelism) { List<Pair<String, String>> deleteFileList = new ArrayList<>(); cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { // Files deleted from a partition @@ -630,7 +659,7 @@ public class HoodieTableMetadataUtil { }); }); - final int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); + final int parallelism = Math.max(Math.min(deleteFileList.size(), bloomIndexParallelism), 1); HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism); return deleteFileListRDD.map(deleteFileInfoPair -> HoodieMetadataPayload.createBloomFilterMetadataRecord( deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), instantTime, StringUtils.EMPTY_STRING, @@ -640,14 +669,20 @@ public class HoodieTableMetadataUtil { /** * Convert clean metadata to column stats index records. * - * @param cleanMetadata - Clean action metadata - * @param engineContext - Engine context - * @param recordsGenerationParams - Parameters for bloom filter record generation + * @param cleanMetadata - Clean action metadata + * @param engineContext - Engine context + * @param dataMetaClient - HoodieTableMetaClient for data + * @param isColumnStatsIndexEnabled - Is column stats index enabled + * @param columnStatsIndexParallelism - Parallelism for column stats index records generation + * @param targetColumnsForColumnStatsIndex - List of columns for column stats index * @return List of column stats index records for the clean metadata */ public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata, HoodieEngineContext engineContext, - MetadataRecordsGenerationParams recordsGenerationParams) { + HoodieTableMetaClient dataMetaClient, + boolean isColumnStatsIndexEnabled, + int columnStatsIndexParallelism, + List<String> targetColumnsForColumnStatsIndex) { List<Pair<String, String>> deleteFileList = new ArrayList<>(); cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { // Files deleted from a partition @@ -655,25 +690,23 @@ public class HoodieTableMetadataUtil { deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry))); }); - HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); - List<String> columnsToIndex = - getColumnsToIndex(recordsGenerationParams, - Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + getColumnsToIndex(isColumnStatsIndexEnabled, targetColumnsForColumnStatsIndex, + Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient))); if (columnsToIndex.isEmpty()) { // In case there are no columns to index, bail return engineContext.emptyHoodieData(); } - int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + int parallelism = Math.max(Math.min(deleteFileList.size(), columnStatsIndexParallelism), 1); return engineContext.parallelize(deleteFileList, parallelism) .flatMap(deleteFileInfoPair -> { String partitionPath = deleteFileInfoPair.getLeft(); String filePath = deleteFileInfoPair.getRight(); if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension()) || ExternalFilePathUtil.isExternallyCreatedFile(filePath)) { - return getColumnStatsRecords(partitionPath, filePath, dataTableMetaClient, columnsToIndex, true).iterator(); + return getColumnStatsRecords(partitionPath, filePath, dataMetaClient, columnsToIndex, true).iterator(); } return Collections.emptyListIterator(); }); @@ -719,7 +752,7 @@ public class HoodieTableMetadataUtil { } }); } catch (IOException e) { - throw new HoodieMetadataException("Parsing rollback plan for " + rollbackInstant.toString() + " failed "); + throw new HoodieMetadataException("Parsing rollback plan for " + rollbackInstant + " failed "); } } @@ -810,8 +843,8 @@ public class HoodieTableMetadataUtil { records.add(record); }); - LOG.info("Found at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() - + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]); + LOG.info("Found at {} from {}. #partitions_updated={}, #files_deleted={}, #files_appended={}", + instantTime, operation, records.size(), fileChangeCount[0], fileChangeCount[1]); return records; } @@ -841,19 +874,21 @@ public class HoodieTableMetadataUtil { public static HoodieData<HoodieRecord> convertFilesToBloomFilterRecords(HoodieEngineContext engineContext, Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, - MetadataRecordsGenerationParams recordsGenerationParams, - String instantTime) { + String instantTime, + HoodieTableMetaClient dataMetaClient, + int bloomIndexParallelism, + String bloomFilterType) { // Create the tuple (partition, filename, isDeleted) to handle both deletes and appends final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles); // Create records MDT - int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), recordsGenerationParams.getBloomIndexParallelism()), 1); + int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), bloomIndexParallelism), 1); return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> { final String partitionName = partitionFileFlagTuple.f0; final String filename = partitionFileFlagTuple.f1; final boolean isDeleted = partitionFileFlagTuple.f2; if (!FSUtils.isBaseFile(new Path(filename))) { - LOG.warn(String.format("Ignoring file %s as it is not a base file", filename)); + LOG.warn("Ignoring file {} as it is not a base file", filename); return Stream.<HoodieRecord>empty().iterator(); } @@ -861,18 +896,18 @@ public class HoodieTableMetadataUtil { ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0); if (!isDeleted) { final String pathWithPartition = partitionName + "/" + filename; - final Path addedFilePath = new Path(recordsGenerationParams.getDataMetaClient().getBasePath(), pathWithPartition); - bloomFilterBuffer = readBloomFilter(recordsGenerationParams.getDataMetaClient().getHadoopConf(), addedFilePath); + final Path addedFilePath = new Path(dataMetaClient.getBasePathV2(), pathWithPartition); + bloomFilterBuffer = readBloomFilter(dataMetaClient.getHadoopConf(), addedFilePath); // If reading the bloom filter failed then do not add a record for this file if (bloomFilterBuffer == null) { - LOG.error("Failed to read bloom filter from " + addedFilePath); + LOG.error("Failed to read bloom filter from {}", addedFilePath); return Stream.<HoodieRecord>empty().iterator(); } } return Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord( - partitionName, filename, instantTime, recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer, partitionFileFlagTuple.f2)) + partitionName, filename, instantTime, bloomFilterType, bloomFilterBuffer, partitionFileFlagTuple.f2)) .iterator(); }); } @@ -883,35 +918,37 @@ public class HoodieTableMetadataUtil { public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEngineContext engineContext, Map<String, List<String>> partitionToDeletedFiles, Map<String, Map<String, Long>> partitionToAppendedFiles, - MetadataRecordsGenerationParams recordsGenerationParams) { + HoodieTableMetaClient dataMetaClient, + boolean isColumnStatsIndexEnabled, + int columnStatsIndexParallelism, + List<String> targetColumnsForColumnStatsIndex) { // Find the columns to index - HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); final List<String> columnsToIndex = - getColumnsToIndex(recordsGenerationParams, - Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient))); + getColumnsToIndex(isColumnStatsIndexEnabled, targetColumnsForColumnStatsIndex, + Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient))); if (columnsToIndex.isEmpty()) { // In case there are no columns to index, bail return engineContext.emptyHoodieData(); } - LOG.info(String.format("Indexing %d columns for column stats index", columnsToIndex.size())); + LOG.info("Indexing {} columns for column stats index", columnsToIndex.size()); // Create the tuple (partition, filename, isDeleted) to handle both deletes and appends final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = fetchPartitionFileInfoTriplets(partitionToDeletedFiles, partitionToAppendedFiles); // Create records MDT - int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), columnStatsIndexParallelism), 1); return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> { final String partitionName = partitionFileFlagTuple.f0; final String filename = partitionFileFlagTuple.f1; final boolean isDeleted = partitionFileFlagTuple.f2; if (!FSUtils.isBaseFile(new Path(filename)) || !filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - LOG.warn(String.format("Ignoring file %s as it is not a PARQUET file", filename)); + LOG.warn("Ignoring file {} as it is not a PARQUET file", filename); return Stream.<HoodieRecord>empty().iterator(); } final String filePathWithPartition = partitionName + "/" + filename; - return getColumnStatsRecords(partitionName, filePathWithPartition, dataTableMetaClient, columnsToIndex, isDeleted).iterator(); + return getColumnStatsRecords(partitionName, filePathWithPartition, dataMetaClient, columnsToIndex, isDeleted).iterator(); }); } @@ -973,7 +1010,7 @@ public class HoodieTableMetadataUtil { */ public static List<FileSlice> getPartitionLatestMergedFileSlices( HoodieTableMetaClient metaClient, HoodieTableFileSystemView fsView, String partition) { - LOG.info("Loading latest merged file slices for metadata table partition " + partition); + LOG.info("Loading latest merged file slices for metadata table partition {}", partition); return getPartitionFileSlices(metaClient, Option.of(fsView), partition, true); } @@ -988,7 +1025,7 @@ public class HoodieTableMetadataUtil { */ public static List<FileSlice> getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, Option<HoodieTableFileSystemView> fsView, String partition) { - LOG.info("Loading latest file slices for metadata table partition " + partition); + LOG.info("Loading latest file slices for metadata table partition {}", partition); return getPartitionFileSlices(metaClient, fsView, partition, false); } @@ -1063,7 +1100,10 @@ public class HoodieTableMetadataUtil { public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata, HoodieEngineContext engineContext, - MetadataRecordsGenerationParams recordsGenerationParams) { + HoodieTableMetaClient dataMetaClient, + boolean isColumnStatsIndexEnabled, + int columnStatsIndexParallelism, + List<String> targetColumnsForColumnStatsIndex) { List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() .flatMap(Collection::stream).collect(Collectors.toList()); @@ -1079,14 +1119,13 @@ public class HoodieTableMetadataUtil { ? Option.empty() : Option.of(new Schema.Parser().parse(writerSchemaStr))); - HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient(); - HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig(); + HoodieTableConfig tableConfig = dataMetaClient.getTableConfig(); // NOTE: Writer schema added to commit metadata will not contain Hudi's metadata fields Option<Schema> tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema); - List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams, + List<String> columnsToIndex = getColumnsToIndex(isColumnStatsIndexEnabled, targetColumnsForColumnStatsIndex, Lazy.eagerly(tableSchema)); if (columnsToIndex.isEmpty()) { @@ -1094,10 +1133,10 @@ public class HoodieTableMetadataUtil { return engineContext.emptyHoodieData(); } - int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1); + int parallelism = Math.max(Math.min(allWriteStats.size(), columnStatsIndexParallelism), 1); return engineContext.parallelize(allWriteStats, parallelism) .flatMap(writeStat -> - translateWriteStatToColumnStats(writeStat, dataTableMetaClient, columnsToIndex).iterator()); + translateWriteStatToColumnStats(writeStat, dataMetaClient, columnsToIndex).iterator()); } catch (Exception e) { throw new HoodieException("Failed to generate column stats records for metadata table", e); } @@ -1106,13 +1145,13 @@ public class HoodieTableMetadataUtil { /** * Get the list of columns for the table for column stats indexing */ - private static List<String> getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams, + private static List<String> getColumnsToIndex(boolean isColumnStatsIndexEnabled, + List<String> targetColumnsForColumnStatsIndex, Lazy<Option<Schema>> lazyWriterSchemaOpt) { - checkState(recordsGenParams.isColumnStatsIndexEnabled()); + checkState(isColumnStatsIndexEnabled); - List<String> targetColumns = recordsGenParams.getTargetColumnsForColumnStatsIndex(); - if (!targetColumns.isEmpty()) { - return targetColumns; + if (!targetColumnsForColumnStatsIndex.isEmpty()) { + return targetColumnsForColumnStatsIndex; } Option<Schema> writerSchemaOpt = lazyWriterSchemaOpt.get(); @@ -1164,19 +1203,17 @@ public class HoodieTableMetadataUtil { List<String> columnsToIndex) { try { if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePath); - List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = + Path fullFilePath = new Path(datasetMetaClient.getBasePathV2(), filePath); + return new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex); - - return columnRangeMetadataList; } - LOG.warn("Column range index not supported for: " + filePath); + LOG.warn("Column range index not supported for: {}", filePath); return Collections.emptyList(); } catch (Exception e) { // NOTE: In case reading column range metadata from individual file failed, // we simply fall back, in lieu of failing the whole task - LOG.error("Failed to fetch column range metadata for: " + filePath); + LOG.error("Failed to fetch column range metadata for: {}", filePath); return Collections.emptyList(); } } @@ -1224,13 +1261,13 @@ public class HoodieTableMetadataUtil { TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient); return Option.of(schemaResolver.getTableAvroSchema()); } catch (Exception e) { - throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e); + throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePathV2(), e); } } /** * Given a schema, coerces provided value to instance of {@link Comparable<?>} such that - * it could subsequently used in column stats + * it could subsequently be used in column stats * * NOTE: This method has to stay compatible with the semantic of * {@link ParquetUtils#readRangeFromParquetMetadata} as they are used in tandem @@ -1330,10 +1367,8 @@ public class HoodieTableMetadataUtil { // instant which we have a log block for. final String earliestInstantTime = validInstantTimestamps.isEmpty() ? SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps); datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream() - .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, earliestInstantTime)) - .forEach(instant -> { - validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline)); - }); + .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, earliestInstantTime)) + .forEach(instant -> validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline))); // add restore and rollback instants from MDT. metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants() @@ -1401,7 +1436,7 @@ public class HoodieTableMetadataUtil { timeline.readRollbackInfoAsBytes(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, instant.getTimestamp())).get(), HoodieRollbackPlan.class); commitsToRollback = Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime()); - LOG.warn("Had to fetch rollback info from requested instant since completed file is empty " + instant.toString()); + LOG.warn("Had to fetch rollback info from requested instant since completed file is empty {}", instant); } return commitsToRollback; } @@ -1411,9 +1446,8 @@ public class HoodieTableMetadataUtil { // Restore is made up of several rollbacks HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( timeline.getInstantDetails(instant).get()); - restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { - rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback())); - }); + restoreMetadata.getHoodieRestoreMetadata().values() + .forEach(rms -> rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback()))); } return rollbackedCommits; } catch (IOException e) { @@ -1447,7 +1481,7 @@ public class HoodieTableMetadataUtil { if (backup) { final Path metadataBackupPath = new Path(metadataTablePath.getParent(), ".metadata_" + dataMetaClient.createNewInstantTime(false)); - LOG.info("Backing up metadata directory to " + metadataBackupPath + " before deletion"); + LOG.info("Backing up metadata directory to {} before deletion", metadataBackupPath); try { if (fs.rename(metadataTablePath, metadataBackupPath)) { return metadataBackupPath.toString(); @@ -1458,7 +1492,7 @@ public class HoodieTableMetadataUtil { } } - LOG.info("Deleting metadata table from " + metadataTablePath); + LOG.info("Deleting metadata table from {}", metadataTablePath); try { fs.delete(metadataTablePath, true); } catch (Exception e) { @@ -1486,7 +1520,7 @@ public class HoodieTableMetadataUtil { return deleteMetadataTable(dataMetaClient, context, backup); } - final Path metadataTablePartitionPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionPath); + final Path metadataTablePartitionPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2()), partitionPath); FileSystem fs = HadoopFSUtils.getFs(metadataTablePartitionPath.toString(), context.getHadoopConf().get()); dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false); try { @@ -1495,7 +1529,7 @@ public class HoodieTableMetadataUtil { } } catch (FileNotFoundException e) { // Ignoring exception as metadata table already does not exist - LOG.debug("Metadata table partition " + partitionPath + " not found at path " + metadataTablePartitionPath); + LOG.debug("Metadata table partition {} not found at path {}", partitionPath, metadataTablePartitionPath); return null; } catch (Exception e) { throw new HoodieMetadataException(String.format("Failed to check existence of MDT partition %s at path %s: ", partitionPath, metadataTablePartitionPath), e); @@ -1504,7 +1538,7 @@ public class HoodieTableMetadataUtil { if (backup) { final Path metadataPartitionBackupPath = new Path(metadataTablePartitionPath.getParent().getParent(), String.format(".metadata_%s_%s", partitionPath, dataMetaClient.createNewInstantTime(false))); - LOG.info(String.format("Backing up MDT partition %s to %s before deletion", partitionPath, metadataPartitionBackupPath)); + LOG.info("Backing up MDT partition {} to {} before deletion", partitionPath, metadataPartitionBackupPath); try { if (fs.rename(metadataTablePartitionPath, metadataPartitionBackupPath)) { return metadataPartitionBackupPath.toString(); @@ -1514,7 +1548,7 @@ public class HoodieTableMetadataUtil { LOG.error(String.format("Failed to backup MDT partition %s using rename", partitionPath), e); } } else { - LOG.info("Deleting metadata table partition from " + metadataTablePartitionPath); + LOG.info("Deleting metadata table partition from {}", metadataTablePartitionPath); try { fs.delete(metadataTablePartitionPath, true); } catch (Exception e) { @@ -1621,10 +1655,10 @@ public class HoodieTableMetadataUtil { } } - LOG.info(String.format("Estimated file group count for MDT partition %s is %d " - + "[recordCount=%d, avgRecordSize=%d, minFileGroupCount=%d, maxFileGroupCount=%d, growthFactor=%f, " - + "maxFileGroupSizeBytes=%d]", partitionType.name(), fileGroupCount, recordCount, averageRecordSize, minFileGroupCount, - maxFileGroupCount, growthFactor, maxFileGroupSizeBytes)); + LOG.info("Estimated file group count for MDT partition {} is {} " + + "[recordCount={}, avgRecordSize={}, minFileGroupCount={}, maxFileGroupCount={}, growthFactor={}, " + + "maxFileGroupSizeBytes={}]", partitionType.name(), fileGroupCount, recordCount, averageRecordSize, minFileGroupCount, + maxFileGroupCount, growthFactor, maxFileGroupSizeBytes); return fileGroupCount; } @@ -1648,10 +1682,7 @@ public class HoodieTableMetadataUtil { } // Does any enabled partition being enabled need to track the written records - if (config.enableRecordIndex()) { - return true; - } - return false; + return config.enableRecordIndex(); } /** @@ -1768,12 +1799,14 @@ public class HoodieTableMetadataUtil { .withReaderSchema(HoodieAvroUtils.getRecordKeySchema()) .withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse("")) .withReverseReader(false) - .withMaxMemorySizeInBytes(configuration.get().getLongBytes(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) + .withMaxMemorySizeInBytes(configuration.get() + .getLongBytes(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()) .withPartition(fileSlice.getPartitionPath()) .withOptimizedLogBlocksScan(configuration.get().getBoolean("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false)) .withDiskMapType(configuration.get().getEnum(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue())) - .withBitCaskDiskMapCompressionEnabled(configuration.get().getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) + .withBitCaskDiskMapCompressionEnabled(configuration.get() + .getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) .withRecordMerger(HoodieRecordUtils.createRecordMerger( metaClient.getBasePathV2().toString(), engineType, diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java deleted file mode 100644 index 72a8bf4cd26..00000000000 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.metadata; - -import org.apache.hudi.common.table.HoodieTableMetaClient; - -import java.io.Serializable; -import java.util.List; - -/** - * Encapsulates all parameters required to generate metadata index for enabled index types. - * - * @deprecated this component currently duplicates configuration coming from the {@code HoodieWriteConfig} - * which is problematic; instead we should break this component down and use source of truth - * for each respective data-point directly ({@code HoodieWriteConfig}, {@code HoodieTableMetaClient}, etc) - */ -@Deprecated -public class MetadataRecordsGenerationParams implements Serializable { - - private final HoodieTableMetaClient dataMetaClient; - private final List<MetadataPartitionType> enabledPartitionTypes; - private final String bloomFilterType; - private final int bloomIndexParallelism; - private final boolean isColumnStatsIndexEnabled; - private final int columnStatsIndexParallelism; - private final List<String> targetColumnsForColumnStatsIndex; - private final List<String> targetColumnsForBloomFilterIndex; - - MetadataRecordsGenerationParams(HoodieTableMetaClient dataMetaClient, List<MetadataPartitionType> enabledPartitionTypes, String bloomFilterType, int bloomIndexParallelism, - boolean isColumnStatsIndexEnabled, int columnStatsIndexParallelism, List<String> targetColumnsForColumnStatsIndex, List<String> targetColumnsForBloomFilterIndex) { - this.dataMetaClient = dataMetaClient; - this.enabledPartitionTypes = enabledPartitionTypes; - this.bloomFilterType = bloomFilterType; - this.bloomIndexParallelism = bloomIndexParallelism; - this.isColumnStatsIndexEnabled = isColumnStatsIndexEnabled; - this.columnStatsIndexParallelism = columnStatsIndexParallelism; - this.targetColumnsForColumnStatsIndex = targetColumnsForColumnStatsIndex; - this.targetColumnsForBloomFilterIndex = targetColumnsForBloomFilterIndex; - } - - public HoodieTableMetaClient getDataMetaClient() { - return dataMetaClient; - } - - public List<MetadataPartitionType> getEnabledPartitionTypes() { - return enabledPartitionTypes; - } - - public String getBloomFilterType() { - return bloomFilterType; - } - - public boolean isColumnStatsIndexEnabled() { - return isColumnStatsIndexEnabled; - } - - public int getBloomIndexParallelism() { - return bloomIndexParallelism; - } - - public int getColumnStatsIndexParallelism() { - return columnStatsIndexParallelism; - } - - public List<String> getTargetColumnsForColumnStatsIndex() { - return targetColumnsForColumnStatsIndex; - } - - public List<String> getSecondaryKeysForBloomFilterIndex() { - return targetColumnsForBloomFilterIndex; - } -}