[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1227127259 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import scala.Tuple2; + +/** + * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata + * table partition when a very large number of records are inserted. + * + * This partitioner requires the records to be already tagged with location. + */ +public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner> { + final int numPartitions; + public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) { +this.numPartitions = numPartitions; + } + + private class FileGroupPartitioner extends Partitioner { + +@Override +public int getPartition(Object key) { + return ((Tuple2)key)._1; +} + +@Override +public int numPartitions() { + return numPartitions; +} + } + + // FileIDs for the various partitions + private List fileIDPfxs; + + /** + * Partition the records by their location. The number of partitions is determined by the number of MDT fileGroups being udpated rather than the + * specific value of outputSparkPartitions. + */ + @Override + public JavaRDD repartitionRecords(JavaRDD records, int outputSparkPartitions) { +Comparator> keyComparator = +(Comparator> & Serializable)(t1, t2) -> t1._2.compareTo(t2._2); + +// Partition the records by their file group +JavaRDD partitionedRDD = records +// key by . The file group index is used to partition and the record key is used to sort within the partition. +.keyBy(r -> { + int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId()); + return new Tuple2<>(fileGroupIndex, r.getRecordKey()); +}) +.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator) +.map(t -> t._2); + +fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> { + // Due to partitioning, all record in the partition should have same fileID. So we only can get the fileID prefix from the first record. + List fileIds = new ArrayList<>(1); + if (recordItr.hasNext()) { +HoodieRecord record = recordItr.next(); +final String fileID = HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId()); +fileIds.add(fileID); + } else { +// FileGroupPartitioner returns a fixed number of partition as part of numPartitions(). In the special case that recordsRDD has fewer +// records than fileGroupCount, some of these partitions (corresponding to fileGroups) will not have any data. +// But we still need to return a fileID for use within {@code BulkInsertMapFunction} +fileIds.add(""); + } + return fileIds.iterator(); +}, true).collect(); +ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == fileIDPfxs.size(), +String.format("Generated fileIDPfxs (%d) are lesser in size than the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions())); + +return partitionedRDD; + } + + @Override + public boolean arePartitionRecordsSorted() { +return true; + } + + @Override + public String getFileIdPfx(int partitionId) { +return fileIDPfxs.get(partitionId); Review Comment: This PR also adds support for automatic estimation of the shard counts for each partition, that can be enhanced. -- This is an automated message from the Apache Git Service. To respond to t
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1227126602 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import scala.Tuple2; + +/** + * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata + * table partition when a very large number of records are inserted. + * + * This partitioner requires the records to be already tagged with location. + */ +public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner> { + final int numPartitions; + public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) { +this.numPartitions = numPartitions; + } + + private class FileGroupPartitioner extends Partitioner { + +@Override +public int getPartition(Object key) { + return ((Tuple2)key)._1; +} + +@Override +public int numPartitions() { + return numPartitions; +} + } + + // FileIDs for the various partitions + private List fileIDPfxs; + + /** + * Partition the records by their location. The number of partitions is determined by the number of MDT fileGroups being udpated rather than the + * specific value of outputSparkPartitions. + */ + @Override + public JavaRDD repartitionRecords(JavaRDD records, int outputSparkPartitions) { +Comparator> keyComparator = +(Comparator> & Serializable)(t1, t2) -> t1._2.compareTo(t2._2); + +// Partition the records by their file group +JavaRDD partitionedRDD = records +// key by . The file group index is used to partition and the record key is used to sort within the partition. +.keyBy(r -> { + int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId()); + return new Tuple2<>(fileGroupIndex, r.getRecordKey()); +}) +.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator) +.map(t -> t._2); + +fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> { + // Due to partitioning, all record in the partition should have same fileID. So we only can get the fileID prefix from the first record. + List fileIds = new ArrayList<>(1); + if (recordItr.hasNext()) { +HoodieRecord record = recordItr.next(); +final String fileID = HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId()); +fileIds.add(fileID); + } else { +// FileGroupPartitioner returns a fixed number of partition as part of numPartitions(). In the special case that recordsRDD has fewer +// records than fileGroupCount, some of these partitions (corresponding to fileGroups) will not have any data. +// But we still need to return a fileID for use within {@code BulkInsertMapFunction} +fileIds.add(""); + } + return fileIds.iterator(); +}, true).collect(); +ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == fileIDPfxs.size(), +String.format("Generated fileIDPfxs (%d) are lesser in size than the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions())); + +return partitionedRDD; + } + + @Override + public boolean arePartitionRecordsSorted() { +return true; + } + + @Override + public String getFileIdPfx(int partitionId) { +return fileIDPfxs.get(partitionId); Review Comment: The idea behind sharding in MDT is that you can create more shards rather than splitting a shard into two files. Spliiting will cause one large and one small size file which is not optimal.
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1219045032 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -626,8 +657,7 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), metadataPartition.getPartitionPath())) -.withFileId(fileGroupFileId) -.overBaseCommit(instantTime) +.withFileId(fileGroupFileId).overBaseCommit(instantTime) .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION) Review Comment: Reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1219043175 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -338,14 +338,20 @@ protected void initMetadataTable(Option instantTime) { * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization */ private void initializeMetadataTable(Option inFlightInstantTimestamp) { -if (config.isMetadataTableEnabled()) { - HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, Option.empty(), inFlightInstantTimestamp); - try { -writer.close(); - } catch (Exception e) { -throw new HoodieException("Failed to instantiate Metadata table ", e); +if (!config.isMetadataTableEnabled()) { + LOG.error("11"); + return; +} + +try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, +context, Option.empty(), inFlightInstantTimestamp)) { + if (writer.isInitialized()) { +writer.performTableServices(inFlightInstantTimestamp); + } else { +throw new HoodieException((".22")); Review Comment: Removed the debug log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1219042633 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -338,14 +338,20 @@ protected void initMetadataTable(Option instantTime) { * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization */ private void initializeMetadataTable(Option inFlightInstantTimestamp) { -if (config.isMetadataTableEnabled()) { - HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, Option.empty(), inFlightInstantTimestamp); - try { -writer.close(); - } catch (Exception e) { -throw new HoodieException("Failed to instantiate Metadata table ", e); +if (!config.isMetadataTableEnabled()) { + LOG.error("11"); Review Comment: Removed the debug log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203532474 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -1052,51 +1091,81 @@ protected HoodieData prepRecords(Map + * Don't perform optimization if there are inflight operations on the dataset. This is for two reasons: + * - The compaction will contain the correct data as all failed operations have been rolled back. + * - Clean/compaction etc. will have the highest timestamp on the MDT and we won't be adding new operations + * with smaller timestamps to metadata table (makes for easier debugging) + * + * This adds the limitations that long-running async operations (clustering, etc.) may cause delay in such MDT + * optimizations. We will relax this after MDT code has been hardened. */ - protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) { -// finish off any pending compactions if any from previous attempt. -writeClient.runAnyPendingCompactions(); - -String latestDeltaCommitTimeInMetadataTable = metadataMetaClient.reloadActiveTimeline() -.getDeltaCommitTimeline() -.filterCompletedInstants() -.lastInstant().orElseThrow(() -> new HoodieMetadataException("No completed deltacommit in metadata table")) -.getTimestamp(); -// we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table. -// Whenever you want to change this logic, please ensure all below scenarios are considered. -// a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed -// b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, latest compaction instant time in MDT represents -// any instants before that is already synced with metadata table. -// c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every -// instant before c4 is synced with metadata table. -List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() - .findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants(); + @Override + public void performTableServices(Option inFlightInstantTimestamp) { +HoodieTimer metadataTableServicesTimer = HoodieTimer.start(); +boolean allTableServicesExecutedSuccessfullyOrSkipped = true; +try { + BaseHoodieWriteClient writeClient = getWriteClient(); + // Run any pending table services operations. + runPendingTableServicesOperations(writeClient); + + // Check and run clean operations. + String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline() + .filterCompletedInstants() + .lastInstant().get() + .getTimestamp(); + LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + ", running clean operations."); + cleanIfNecessary(writeClient, latestDeltacommitTime); + + // Do timeline validation before scheduling compaction/logcompaction operations. + if (!validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, latestDeltacommitTime)) { +return; Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203530662 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -760,15 +815,14 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), metadataPartition.getPartitionPath())) -.withFileId(fileGroupFileId) -.overBaseCommit(instantTime) +.withFileId(fileGroupFileId).overBaseCommit(instantTime) .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION) .withFileSize(0L) -.withSizeThreshold(metadataWriteConfig.getLogFileMaxSize()) -.withFs(dataMetaClient.getFs()) -.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN) -.withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN) -.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); +.withSizeThreshold(metadataWriteConfig.getLogFileMaxSize()) +.withFs(dataMetaClient.getFs()) +.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN) Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203530124 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -564,53 +532,147 @@ private boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List partitionsToInit, Option inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } -String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - -initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); -initTableMetadata(); -// if async metadata indexing is enabled, -// then only initialize files partition as other partitions will be built using HoodieIndexer -List enabledPartitionTypes = new ArrayList<>(); -if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); -} else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; +// FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) +|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + +metadataMetaClient = initializeMetaClient(); + +// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT +boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); +List partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); +Map> partitionToFilesMap = partitionInfoList.stream() Review Comment: If the MDT already exists then we are here initializing other partitions. For that we would need the list of all files and partitions which we can load from MDT files partition itself. MDT files partitions is pre-requisite for all other partitions so it should either exist or be created first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203525353 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -375,105 +357,91 @@ public List getEnabledPartitionTypes() { return this.enabledPartitionTypes; } - /** - * Initialize the metadata table if it does not exist. - * - * If the metadata table does not exist, then file and partition listing is used to initialize the table. - * - * @param engineContext - * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase - * @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored - * while deciding to initialize the metadata table. - */ - protected abstract void initialize(HoodieEngineContext engineContext, -Option actionMetadata, - Option inflightInstantTimestamp); - - public void initTableMetadata() { -try { - if (this.metadata != null) { -this.metadata.close(); - } - this.metadata = new HoodieBackedTableMetadata(engineContext, dataWriteConfig.getMetadataConfig(), - dataWriteConfig.getBasePath(), dataWriteConfig.getSpillableMapBasePath()); - this.metadataMetaClient = metadata.getMetadataMetaClient(); -} catch (Exception e) { - throw new HoodieException("Error initializing metadata table for reads", e); -} - } - /** * Initialize the metadata table if needed. * * @param dataMetaClient - meta client for the data table * @param actionMetadata - optional action metadata * @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset * @param - action metadata types extending Avro generated SpecificRecordBase - * @throws IOException + * @throws IOException on errors */ - protected void initializeIfNeeded(HoodieTableMetaClient dataMetaClient, - Option actionMetadata, - Option inflightInstantTimestamp) throws IOException { + protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, + Option actionMetadata, + Option inflightInstantTimestamp) throws IOException { HoodieTimer timer = HoodieTimer.start(); +List partitionsToInit = new ArrayList<>(MetadataPartitionType.values().length); -boolean exists = metadataTableExists(dataMetaClient, actionMetadata); +try { + boolean exists = metadataTableExists(dataMetaClient, actionMetadata); + if (!exists) { +// FILES partition is always required +partitionsToInit.add(MetadataPartitionType.FILES); + } -if (!exists) { - // Initialize for the first time by listing partitions and files directly from the file system - if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) { -metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); + // check if any of the enabled partition types needs to be initialized + // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. + if (!dataWriteConfig.isMetadataAsyncIndex()) { +Set inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); +LOG.info("Async metadata indexing disabled and following partitions already initialized: " + inflightAndCompletedPartitions); +this.enabledPartitionTypes.stream() +.filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) +.forEach(partitionsToInit::add); } - return; -} -// if metadata table exists, then check if any of the enabled partition types needs to be initialized -// NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. -if (!dataWriteConfig.isMetadataAsyncIndex()) { - Set inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); - LOG.info("Async metadata indexing enabled and following partitions already initialized: " + inflightAndCompletedPartitions); - List partitionsToInit = this.enabledPartitionTypes.stream() - .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203521583 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -489,7 +457,7 @@ private boolean metadataTableExists(HoodieTableMe * TODO: Revisit this logic and validate that filtering for all * commits timeline is the right thing to do * - * @return True if the initialize is not needed, False otherwise + * @return True if the initialization is not needed, False otherwise Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203518705 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -337,15 +337,33 @@ protected void initMetadataTable(Option instantTime) { * * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization */ - private void initializeMetadataTable(Option inFlightInstantTimestamp) { -if (config.isMetadataTableEnabled()) { - HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, Option.empty(), inFlightInstantTimestamp); - try { -writer.close(); - } catch (Exception e) { -throw new HoodieException("Failed to instantiate Metadata table ", e); + private void initializeMetadataTable(WriteOperationType operationType, Option inFlightInstantTimestamp) { +if (!config.isMetadataTableEnabled()) { + return; +} + +try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, +context, Option.empty(), inFlightInstantTimestamp)) { + if (writer.isInitialized()) { +// Optimize the metadata table which involves compacton. cleaning, etc. This should only be called from writers. +switch (operationType) { + case INSERT: + case INSERT_PREPPED: + case UPSERT: + case UPSERT_PREPPED: + case BULK_INSERT: + case BULK_INSERT_PREPPED: + case DELETE: Review Comment: On second thoughts, the switch is not necessary. The above code is within a transaction lock so there should not be any conflicts of multiple writers optimizing MDT together. The checks within performTableServices should be light enough or we can optimize them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203503606 ## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ## @@ -694,17 +695,75 @@ private Long getTableChecksum() { return getLong(TABLE_CHECKSUM); } - public List getMetadataPartitionsInflight() { -return StringUtils.split( -getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), -CONFIG_VALUES_DELIMITER -); + public Set getMetadataPartitionsInflight() { +return new HashSet<>(StringUtils.split( +getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), +CONFIG_VALUES_DELIMITER)); } public Set getMetadataPartitions() { return new HashSet<>( -StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), -CONFIG_VALUES_DELIMITER)); +StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), +CONFIG_VALUES_DELIMITER)); + } + + /** + * @returns true if metadata table has been created and is being used for this dataset, else returns false. + */ + public boolean isMetadataTableEnabled() { +return isMetadataPartitionEnabled(MetadataPartitionType.FILES); + } + + /** + * Checks if metadata table is enabled and the specified partition has been initialized. + * + * @param partition The partition to check + * @returns true if the specific partition has been initialized, else returns false. + */ + public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) { +return getMetadataPartitions().contains(partition.getPartitionPath()); + } + + /** + * Enables or disables the specified metadata table partition. + * + * @param partition The partition + * @param enabled If true, the partition is enabled, else disabled + */ + public void setMetadataPartitionState(MetadataPartitionType partition, boolean enabled) { + ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER), +"Metadata Table partition path cannot contain a comma: " + partition.getPartitionPath()); +Set partitions = getMetadataPartitions(); +Set partitionsInflight = getMetadataPartitionsInflight(); +if (enabled) { + partitions.add(partition.getPartitionPath()); + partitionsInflight.remove(partition.getPartitionPath()); +} else if (partition.equals(MetadataPartitionType.FILES)) { + // file listing partition is required for all other partitions to work + // Disabling file partition will also disable all partitions + partitions.clear(); + partitionsInflight.clear(); +} else { + partitions.remove(partition.getPartitionPath()); + partitionsInflight.remove(partition.getPartitionPath()); +} +setValue(TABLE_METADATA_PARTITIONS, partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); +setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); Review Comment: It is persisted from the caller side - HoodieTableMetadataUtil.setMetadataPartitionState I have removed the HoodieTableMetadataUtil.setMetadataPartitionState as it was unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194268603 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan // Trigger cleaning with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. -writeClient.clean(instantTime + "002"); + writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime)); writeClient.lazyRollbackFailedIndexing(); } /** - * This is invoked to initialize metadata table for a dataset. - * Initial commit has special handling mechanism due to its scale compared to other regular commits. - * During cold startup, the list of files to be committed can be huge. - * So creating a HoodieCommitMetadata out of these large number of files, - * and calling the existing update(HoodieCommitMetadata) function does not scale well. - * Hence, we have a special commit just for the initialization scenario. + * Validates the timeline for both main and metadata tables. */ - private void initialCommit(String createInstantTime, List partitionTypes) { -// List all partitions in the basePath of the containing dataset -LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); -engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName()); - -Map> partitionToRecordsMap = new HashMap<>(); - -// skip file system listing to populate metadata records if it's a fresh table. -// this is applicable only if the table already has N commits and metadata is enabled at a later point in time. -if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table. - // If not, last completed commit in data table will be chosen as the initial commit time. - LOG.info("Triggering empty Commit to metadata to initialize"); -} else { - List partitionInfoList = listAllPartitions(dataMetaClient); - Map> partitionToFilesMap = partitionInfoList.stream() - .map(p -> { -String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); -return Pair.of(partitionName, p.getFileNameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); - List partitions = new ArrayList<>(partitionToFilesMap.keySet()); - - if (partitionTypes.contains(MetadataPartitionType.FILES)) { -// Record which saves the list of all partitions -HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); -HoodieData filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); -ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); -partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); - } - - if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { -final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( -engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); -partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); - } - - if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) { -final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( -engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); -partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); - } - LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"); + private boolean validateTimelineBeforeSchedulingCompaction(Option inFlightInstantTimestamp, String latestDeltacommitTime) { +// There should not be any incomplete instants on MDT +HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); +List pendingInstantsOnMetadataTable = metadataTimeline.filterInflightsAndRequested().getInstants(); +if (!pendingInstantsOnMetadataTable.isEmpty()) { + LOG.info(String.format( + "Cannot compact MDT as there are %d inflight instants: %s", + pendingInst
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194268190 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan // Trigger cleaning with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. -writeClient.clean(instantTime + "002"); + writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime)); writeClient.lazyRollbackFailedIndexing(); } /** - * This is invoked to initialize metadata table for a dataset. - * Initial commit has special handling mechanism due to its scale compared to other regular commits. - * During cold startup, the list of files to be committed can be huge. - * So creating a HoodieCommitMetadata out of these large number of files, - * and calling the existing update(HoodieCommitMetadata) function does not scale well. - * Hence, we have a special commit just for the initialization scenario. + * Validates the timeline for both main and metadata tables. */ - private void initialCommit(String createInstantTime, List partitionTypes) { -// List all partitions in the basePath of the containing dataset -LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); -engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName()); - -Map> partitionToRecordsMap = new HashMap<>(); - -// skip file system listing to populate metadata records if it's a fresh table. -// this is applicable only if the table already has N commits and metadata is enabled at a later point in time. -if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table. - // If not, last completed commit in data table will be chosen as the initial commit time. - LOG.info("Triggering empty Commit to metadata to initialize"); -} else { - List partitionInfoList = listAllPartitions(dataMetaClient); - Map> partitionToFilesMap = partitionInfoList.stream() - .map(p -> { -String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); -return Pair.of(partitionName, p.getFileNameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); - List partitions = new ArrayList<>(partitionToFilesMap.keySet()); - - if (partitionTypes.contains(MetadataPartitionType.FILES)) { -// Record which saves the list of all partitions -HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); -HoodieData filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); -ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); -partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); - } - - if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { -final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( -engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); -partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); - } - - if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) { -final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( -engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); -partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); - } - LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"); + private boolean validateTimelineBeforeSchedulingCompaction(Option inFlightInstantTimestamp, String latestDeltacommitTime) { +// There should not be any incomplete instants on MDT +HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); +List pendingInstantsOnMetadataTable = metadataTimeline.filterInflightsAndRequested().getInstants(); +if (!pendingInstantsOnMetadataTable.isEmpty()) { + LOG.info(String.format( + "Cannot compact MDT as there are %d inflight instants: %s", + pendingInst
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194267719 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan // Trigger cleaning with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. -writeClient.clean(instantTime + "002"); + writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime)); writeClient.lazyRollbackFailedIndexing(); } /** - * This is invoked to initialize metadata table for a dataset. - * Initial commit has special handling mechanism due to its scale compared to other regular commits. - * During cold startup, the list of files to be committed can be huge. - * So creating a HoodieCommitMetadata out of these large number of files, - * and calling the existing update(HoodieCommitMetadata) function does not scale well. - * Hence, we have a special commit just for the initialization scenario. + * Validates the timeline for both main and metadata tables. */ - private void initialCommit(String createInstantTime, List partitionTypes) { -// List all partitions in the basePath of the containing dataset -LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); -engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName()); - -Map> partitionToRecordsMap = new HashMap<>(); - -// skip file system listing to populate metadata records if it's a fresh table. -// this is applicable only if the table already has N commits and metadata is enabled at a later point in time. -if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table. - // If not, last completed commit in data table will be chosen as the initial commit time. - LOG.info("Triggering empty Commit to metadata to initialize"); -} else { - List partitionInfoList = listAllPartitions(dataMetaClient); - Map> partitionToFilesMap = partitionInfoList.stream() - .map(p -> { -String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); -return Pair.of(partitionName, p.getFileNameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); - List partitions = new ArrayList<>(partitionToFilesMap.keySet()); - - if (partitionTypes.contains(MetadataPartitionType.FILES)) { -// Record which saves the list of all partitions -HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); -HoodieData filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); -ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); -partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); - } - - if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { -final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( -engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); -partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); - } - - if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) { -final HoodieData recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( -engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); -partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); - } - LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"); + private boolean validateTimelineBeforeSchedulingCompaction(Option inFlightInstantTimestamp, String latestDeltacommitTime) { +// There should not be any incomplete instants on MDT +HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); +List pendingInstantsOnMetadataTable = metadataTimeline.filterInflightsAndRequested().getInstants(); +if (!pendingInstantsOnMetadataTable.isEmpty()) { + LOG.info(String.format( + "Cannot compact MDT as there are %d inflight instants: %s", + pendingInst
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194226408 ## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java: ## @@ -2482,6 +2483,14 @@ public void testMetadataMetrics() throws Exception { } } + @Test + public void testGetFileGroupIndexFromFileId() { Review Comment: Added test for first 3. getFileIdLengthWithoutFileIndex is private and is called from other functions so is indirectly tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194213033 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import scala.Tuple2; + +/** + * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata + * table partition when a very large number of records are inserted. + * + * This partitioner requires the records to be already tagged with location. + */ +public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner> { Review Comment: Yes, added TestSparkHoodieMetadataBulkInsertPartitioner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194209832 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -824,25 +863,22 @@ private interface ConvertMetadataFunction { /** * Processes commit metadata from data table and commits to metadata table. * - * @param instantTime instant time of interest. + * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. - * @param type of commit metadata. - * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { -if (!dataWriteConfig.isMetadataTableEnabled()) { - return; -} + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { +ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled()); + Set partitionsToUpdate = getMetadataPartitionsToUpdate(); Set inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig()); // if indexing is inflight then do not trigger table service boolean doNotTriggerTableService = partitionsToUpdate.stream().anyMatch(inflightIndexes::contains); Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194208983 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -562,53 +532,144 @@ private boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List partitionsToInit, Option inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } -String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - -initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); -initTableMetadata(); -// if async metadata indexing is enabled, -// then only initialize files partition as other partitions will be built using HoodieIndexer -List enabledPartitionTypes = new ArrayList<>(); -if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); -} else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; +// FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) +|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + +metadataMetaClient = initializeMetaClient(); + +// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT +boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); +List partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); +Map> partitionToFilesMap = partitionInfoList.stream() +.map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); +}) +.collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + +for (MetadataPartitionType partitionType : partitionsToInit) { + // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. + String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); + + LOG.info("Initializing MDT partition " + partitionType + " at instant " + commitTimeForPartition); + + Pair> fileGroupCountAndRecordsPair; + switch (partitionType) { +case FILES: + fileGroupCountAndRecordsPair = initializeFilesPartition(initializationTime, partitionInfoList); + break; +case BLOOM_FILTERS: + fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(initializationTime, partitionToFilesMap); + break; +case COLUMN_STATS: + fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap); + break; +default: + throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + } + + // Generate the file groups + final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType + " should be > 0"); + initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); + + // Perform the commit using bulkCommit + HoodieData records = fileGroupCountAndRecordsPair.getValue(); + bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount); + metadataMetaClient.reloadActiveTimeline(); + dataMetaClient = HoodieTableMetadataUtil.setMetadataPartitionState(dataMetaClient, partitionType, true); } -initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); -initialCommit(createInstantTime, enabledPartitionTypes); -updateInitializedPartitionsInTableConfig(enabledPartitionTypes); + return true; } - private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) { -// If there is no
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194204960 ## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ## @@ -670,17 +670,75 @@ private Long getTableChecksum() { return getLong(TABLE_CHECKSUM); } - public List getMetadataPartitionsInflight() { -return StringUtils.split( -getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), -CONFIG_VALUES_DELIMITER -); + public Set getMetadataPartitionsInflight() { +return new HashSet<>(StringUtils.split( +getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), +CONFIG_VALUES_DELIMITER)); } public Set getMetadataPartitions() { return new HashSet<>( -StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), -CONFIG_VALUES_DELIMITER)); +StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), +CONFIG_VALUES_DELIMITER)); + } + + /** + * @returns true if metadata table has been created and is being used for this dataset, else returns false. + */ + public boolean isMetadataTableEnabled() { +return isMetadataPartitionEnabled(MetadataPartitionType.FILES); + } + + /** + * Checks if metadata table is enabled and the specified partition has been initialized. + * + * @param partition The partition to check + * @returns true if the specific partition has been initialized, else returns false. + */ + public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) { +return getMetadataPartitions().contains(partition.getPartitionPath()); + } + + /** + * Enables or disables the specified metadata table partition. + * + * @param partition The partition + * @param enabled If true, the partition is enabled, else disabled + */ + public void setMetadataPartitionState(MetadataPartitionType partition, boolean enabled) { + ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER), +"Metadata Table partition path cannot contain a comma: " + partition.getPartitionPath()); +Set partitions = getMetadataPartitions(); +Set partitionsInflight = getMetadataPartitionsInflight(); +if (enabled) { + partitions.add(partition.getPartitionPath()); + partitionsInflight.remove(partition.getPartitionPath()); +} else if (partition.equals(MetadataPartitionType.FILES)) { + // file listing partition is required for all other partitions to work + // Disabling file partition will also disable all partitions + partitions.clear(); + partitionsInflight.clear(); +} else { + partitions.remove(partition.getPartitionPath()); + partitionsInflight.remove(partition.getPartitionPath()); +} +setValue(TABLE_METADATA_PARTITIONS, partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); +setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); + } + + /** + * Enables the specified metadata table partition as inflight. + * + * @param partition The list of partitions to enable as inflight. + */ + public void setMetadataPartitionsAsInflight(List partitionTypes) { +Set partitions = getMetadataPartitionsInflight(); Review Comment: Done/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192180200 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -1378,6 +1339,206 @@ public static Set getInflightAndCompletedMetadataPartitions(HoodieTableC */ public static boolean isIndexingCommit(String instantTime) { return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + METADATA_INDEXER_TIME_SUFFIX.length() -&& instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); +&& instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); + } + + /** + * Delete the metadata table for the dataset and backup if required. + * + * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted + * @param contextinstance of {@link HoodieEngineContext}. + * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the + * directory with name metadata_. + * @return The backup directory if backup was requested + */ + public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, boolean backup) { +final Path metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2()); +FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), context.getHadoopConf().get()); +setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, false); +try { + if (!fs.exists(metadataTablePath)) { +return null; + } +} catch (FileNotFoundException e) { + // Ignoring exception as metadata table already does not exist + return null; +} catch (IOException e) { + throw new HoodieMetadataException("Failed to check metadata table existence", e); +} + +if (backup) { + final Path metadataBackupPath = new Path(metadataTablePath.getParent(), ".metadata_" + HoodieActiveTimeline.createNewInstantTime()); + LOG.info("Backing up metadata directory to " + metadataBackupPath + " before deletion"); + try { +if (fs.rename(metadataTablePath, metadataBackupPath)) { + return metadataBackupPath.toString(); +} + } catch (Exception e) { +// If rename fails, we will ignore the backup and still delete the MDT +LOG.error("Failed to backup metadata table using rename", e); + } +} + +LOG.info("Deleting metadata table from " + metadataTablePath); +try { + fs.delete(metadataTablePath, true); +} catch (Exception e) { + throw new HoodieMetadataException("Failed to delete metadata table from path " + metadataTablePath, e); +} + +return null; + } + + public static HoodieTableMetaClient setMetadataPartitionState(HoodieTableMetaClient dataMetaClient, MetadataPartitionType partitionType, boolean enabled) { +dataMetaClient.getTableConfig().setMetadataPartitionState(partitionType, enabled); +HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps()); +dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient); + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionEnabled(partitionType) == enabled, +"Metadata table state change should be persisted"); + +LOG.info(String.format("Metadata table %s partition %s has been %s", dataMetaClient.getBasePathV2(), partitionType, +enabled ? "enabled" : "disabled")); +return dataMetaClient; + } + + /** + * Delete a partition within the metadata table. + * + * This can be used to delete a partition so that it can be re-bootstrapped. + * + * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted + * @param contextinstance of {@code HoodieEngineContext}. + * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the + * directory with name metadata_. + * @param partitionType The partition to delete + * @return The backup directory if backup was requested, null otherwise + */ + public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, +MetadataPartitionType partitionType, boolean backup) { +if (partitionType.equals(MetadataPartitionType.FILES)) { + return deleteMetadataTable(dataMetaClient, context, backup); +} + +final Path metadataTablePartitionPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionType.getPartitionPath()); +FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), context.getHadoopConf().get()); +setMetadataPartitionS
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192169835 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -1378,6 +1339,206 @@ public static Set getInflightAndCompletedMetadataPartitions(HoodieTableC */ public static boolean isIndexingCommit(String instantTime) { return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + METADATA_INDEXER_TIME_SUFFIX.length() -&& instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); +&& instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); + } + + /** + * Delete the metadata table for the dataset and backup if required. + * + * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted + * @param contextinstance of {@link HoodieEngineContext}. + * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the + * directory with name metadata_. + * @return The backup directory if backup was requested + */ + public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, boolean backup) { +final Path metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2()); +FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), context.getHadoopConf().get()); +setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, false); +try { + if (!fs.exists(metadataTablePath)) { +return null; + } +} catch (FileNotFoundException e) { + // Ignoring exception as metadata table already does not exist + return null; +} catch (IOException e) { + throw new HoodieMetadataException("Failed to check metadata table existence", e); +} + +if (backup) { + final Path metadataBackupPath = new Path(metadataTablePath.getParent(), ".metadata_" + HoodieActiveTimeline.createNewInstantTime()); + LOG.info("Backing up metadata directory to " + metadataBackupPath + " before deletion"); + try { +if (fs.rename(metadataTablePath, metadataBackupPath)) { + return metadataBackupPath.toString(); +} + } catch (Exception e) { +// If rename fails, we will ignore the backup and still delete the MDT +LOG.error("Failed to backup metadata table using rename", e); + } +} + +LOG.info("Deleting metadata table from " + metadataTablePath); +try { + fs.delete(metadataTablePath, true); +} catch (Exception e) { + throw new HoodieMetadataException("Failed to delete metadata table from path " + metadataTablePath, e); +} + +return null; + } + + public static HoodieTableMetaClient setMetadataPartitionState(HoodieTableMetaClient dataMetaClient, MetadataPartitionType partitionType, boolean enabled) { +dataMetaClient.getTableConfig().setMetadataPartitionState(partitionType, enabled); +HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps()); +dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient); + ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionEnabled(partitionType) == enabled, +"Metadata table state change should be persisted"); + +LOG.info(String.format("Metadata table %s partition %s has been %s", dataMetaClient.getBasePathV2(), partitionType, +enabled ? "enabled" : "disabled")); +return dataMetaClient; + } + + /** + * Delete a partition within the metadata table. + * + * This can be used to delete a partition so that it can be re-bootstrapped. + * + * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for which metadata table is to be deleted + * @param contextinstance of {@code HoodieEngineContext}. + * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the + * directory with name metadata_. + * @param partitionType The partition to delete + * @return The backup directory if backup was requested, null otherwise + */ + public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, +MetadataPartitionType partitionType, boolean backup) { +if (partitionType.equals(MetadataPartitionType.FILES)) { + return deleteMetadataTable(dataMetaClient, context, backup); +} + +final Path metadataTablePartitionPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionType.getPartitionPath()); +FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), context.getHadoopConf().get()); +setMetadataPartitionS
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192166826 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -873,17 +908,7 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List { String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath(); LOG.info(String.format("Creating a new metadata index for partition '%s' under path %s upto instant %s", - relativePartitionPath, metadataWriteConfig.getBasePath(), indexUptoInstantTime)); - try { -// file group should have already been initialized while scheduling index for this partition -if (!dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) { Review Comment: HoodieBackedTableMetadataWriter::initializeFromFilesystem There is a call to initializeFileGroups() Within initializeFileGroups() existing fileSlices should be deleted (I filed a separate PR for this - the one where the fileSlices are created in parallel). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192164647 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java: ## @@ -118,46 +123,32 @@ protected void initRegistry() { } @Override - protected void initialize(HoodieEngineContext engineContext, - Option actionMetadata, - Option inflightInstantTimestamp) { -try { - metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> { -if (registry instanceof DistributedRegistry) { - HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; - ((DistributedRegistry) registry).register(sparkEngineContext.getJavaSparkContext()); -} - }); + protected void commit(String instantTime, Map> partitionRecordsMap) { +commitInternal(instantTime, partitionRecordsMap, Option.empty()); + } - if (enabled) { -initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); - } -} catch (IOException e) { - LOG.error("Failed to initialize metadata table. Disabling the writer.", e); - enabled = false; -} + protected void bulkCommit( + String instantTime, MetadataPartitionType partitionType, HoodieData records, + int fileGroupCount) { +Map> partitionRecordsMap = new HashMap<>(); +partitionRecordsMap.put(partitionType, records); +SparkHoodieMetadataBulkInsertPartitioner partitioner = new SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount); Review Comment: It will write HFile since the base file format of the MDT is HFile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192162585 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -824,25 +863,22 @@ private interface ConvertMetadataFunction { /** * Processes commit metadata from data table and commits to metadata table. * - * @param instantTime instant time of interest. + * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. - * @param type of commit metadata. - * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { -if (!dataWriteConfig.isMetadataTableEnabled()) { - return; -} + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { +ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled()); + Set partitionsToUpdate = getMetadataPartitionsToUpdate(); Set inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig()); // if indexing is inflight then do not trigger table service boolean doNotTriggerTableService = partitionsToUpdate.stream().anyMatch(inflightIndexes::contains); Review Comment: If you mean the function processAndCommit, then yes. This is used for non-initial commits into MDT. Only the initial commit is a bulkInsert. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192161869 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -824,25 +863,22 @@ private interface ConvertMetadataFunction { /** * Processes commit metadata from data table and commits to metadata table. * - * @param instantTime instant time of interest. + * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. - * @param type of commit metadata. - * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { -if (!dataWriteConfig.isMetadataTableEnabled()) { - return; -} + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { +ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled()); + Set partitionsToUpdate = getMetadataPartitionsToUpdate(); Set inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig()); // if indexing is inflight then do not trigger table service boolean doNotTriggerTableService = partitionsToUpdate.stream().anyMatch(inflightIndexes::contains); Review Comment: You mean the ValidationUtils.checkArgument? Seems redundant as this is already checked in constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192161017 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -619,41 +680,34 @@ private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti return false; } - private void updateInitializedPartitionsInTableConfig(List partitionTypes) { -Set completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions(); - completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet())); - dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions)); -HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps()); - } - - private HoodieTableMetaClient initializeMetaClient(boolean populateMetaFields) throws IOException { + private HoodieTableMetaClient initializeMetaClient() throws IOException { return HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.MERGE_ON_READ) .setTableName(tableName) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPayloadClassName(HoodieMetadataPayload.class.getName()) .setBaseFileFormat(HoodieFileFormat.HFILE.toString()) .setRecordKeyFields(RECORD_KEY_FIELD_NAME) -.setPopulateMetaFields(populateMetaFields) - .setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) -.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); +.setPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS) + .setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) +.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192160408 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -373,105 +356,92 @@ public List getEnabledPartitionTypes() { return this.enabledPartitionTypes; } - /** - * Initialize the metadata table if it does not exist. - * - * If the metadata table does not exist, then file and partition listing is used to initialize the table. - * - * @param engineContext - * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase - * @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored - * while deciding to initialize the metadata table. - */ - protected abstract void initialize(HoodieEngineContext engineContext, -Option actionMetadata, - Option inflightInstantTimestamp); - - public void initTableMetadata() { -try { - if (this.metadata != null) { -this.metadata.close(); - } - this.metadata = new HoodieBackedTableMetadata(engineContext, dataWriteConfig.getMetadataConfig(), - dataWriteConfig.getBasePath(), dataWriteConfig.getSpillableMapBasePath()); - this.metadataMetaClient = metadata.getMetadataMetaClient(); -} catch (Exception e) { - throw new HoodieException("Error initializing metadata table for reads", e); -} - } - /** * Initialize the metadata table if needed. * * @param dataMetaClient - meta client for the data table * @param actionMetadata - optional action metadata * @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset * @param - action metadata types extending Avro generated SpecificRecordBase - * @throws IOException + * @throws IOException on errors */ - protected void initializeIfNeeded(HoodieTableMetaClient dataMetaClient, - Option actionMetadata, - Option inflightInstantTimestamp) throws IOException { + protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, + Option actionMetadata, + Option inflightInstantTimestamp) throws IOException { HoodieTimer timer = HoodieTimer.start(); +List partitionsToInit = new ArrayList<>(MetadataPartitionType.values().length); -boolean exists = metadataTableExists(dataMetaClient, actionMetadata); +try { + boolean exists = metadataTableExists(dataMetaClient, actionMetadata); + if (!exists) { +// FILES partition is always required +partitionsToInit.add(MetadataPartitionType.FILES); + } -if (!exists) { - // Initialize for the first time by listing partitions and files directly from the file system - if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) { -metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); + // check if any of the enabled partition types needs to be initialized + // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. + if (!dataWriteConfig.isMetadataAsyncIndex()) { +Set inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); +LOG.info("Async metadata indexing disabled and following partitions already initialized: " + inflightAndCompletedPartitions); +this.enabledPartitionTypes.stream() +.filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) +.forEach(partitionsToInit::add); } - return; -} -// if metadata table exists, then check if any of the enabled partition types needs to be initialized -// NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. -if (!dataWriteConfig.isMetadataAsyncIndex()) { - Set inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); - LOG.info("Async metadata indexing enabled and following partitions already initialized: " + inflightAndCompletedPartitions); - List partitionsToInit = this.enabledPartitionTypes.stream() - .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) &&
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192159380 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -562,53 +532,144 @@ private boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List partitionsToInit, Option inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } -String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - -initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); -initTableMetadata(); -// if async metadata indexing is enabled, -// then only initialize files partition as other partitions will be built using HoodieIndexer -List enabledPartitionTypes = new ArrayList<>(); -if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); -} else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; +// FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) +|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + +metadataMetaClient = initializeMetaClient(); + +// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT +boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); +List partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); +Map> partitionToFilesMap = partitionInfoList.stream() +.map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); +}) +.collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + +for (MetadataPartitionType partitionType : partitionsToInit) { + // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. + String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); + + LOG.info("Initializing MDT partition " + partitionType + " at instant " + commitTimeForPartition); + + Pair> fileGroupCountAndRecordsPair; + switch (partitionType) { +case FILES: + fileGroupCountAndRecordsPair = initializeFilesPartition(initializationTime, partitionInfoList); + break; +case BLOOM_FILTERS: + fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(initializationTime, partitionToFilesMap); + break; +case COLUMN_STATS: + fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap); + break; +default: + throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + } + + // Generate the file groups + final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType + " should be > 0"); + initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); Review Comment: Added doc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192156057 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -562,53 +532,144 @@ private boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List partitionsToInit, Option inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } -String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - -initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); -initTableMetadata(); -// if async metadata indexing is enabled, -// then only initialize files partition as other partitions will be built using HoodieIndexer -List enabledPartitionTypes = new ArrayList<>(); -if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); -} else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; +// FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) +|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + +metadataMetaClient = initializeMetaClient(); + +// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT +boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); +List partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); +Map> partitionToFilesMap = partitionInfoList.stream() +.map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); +}) +.collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + +for (MetadataPartitionType partitionType : partitionsToInit) { + // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. + String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); + + LOG.info("Initializing MDT partition " + partitionType + " at instant " + commitTimeForPartition); + + Pair> fileGroupCountAndRecordsPair; + switch (partitionType) { +case FILES: + fileGroupCountAndRecordsPair = initializeFilesPartition(initializationTime, partitionInfoList); + break; +case BLOOM_FILTERS: + fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(initializationTime, partitionToFilesMap); + break; +case COLUMN_STATS: + fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap); + break; +default: + throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + } + + // Generate the file groups + final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType + " should be > 0"); + initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); + + // Perform the commit using bulkCommit + HoodieData records = fileGroupCountAndRecordsPair.getValue(); + bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount); + metadataMetaClient.reloadActiveTimeline(); + dataMetaClient = HoodieTableMetadataUtil.setMetadataPartitionState(dataMetaClient, partitionType, true); } -initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); -initialCommit(createInstantTime, enabledPartitionTypes); -updateInitializedPartitionsInTableConfig(enabledPartitionTypes); + return true; } - private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) { -// If there is no
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192140304 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java: ## @@ -100,15 +99,6 @@ public Option execute() { // get last completed instant Option indexUptoInstant = table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant(); if (indexUptoInstant.isPresent()) { -// start initializing file groups -// in case FILES partition itself was not initialized before (i.e. metadata was never enabled), this will initialize synchronously -HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime) -.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to initialize filegroups for indexing for instant: %s", instantTime))); -if (!finalPartitionsToIndex.get(0).getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath())) { - // initialize metadata partition only if not for FILES partition. - metadataWriter.initializeMetadataPartitions(table.getMetaClient(), finalPartitionsToIndex, indexUptoInstant.get().getTimestamp()); Review Comment: Initialization of file groups happens just before the bulkInsert in HoodieBackedTableMetadataWriter::initializeFromFilesystem -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192139220 ## hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java: ## @@ -33,10 +33,6 @@ public enum MetadataPartitionType { private final String partitionPath; // FileId prefix used for all file groups in this partition. private final String fileIdPrefix; - // Total file groups - // TODO fix: enum should not have any mutable aspect as this compromises whole idea - // of the enum being static, immutable entity - private int fileGroupCount = 1; Review Comment: Didnt see any code serializing and saving this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192138716 ## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ## @@ -670,17 +670,75 @@ private Long getTableChecksum() { return getLong(TABLE_CHECKSUM); } - public List getMetadataPartitionsInflight() { -return StringUtils.split( -getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), -CONFIG_VALUES_DELIMITER -); + public Set getMetadataPartitionsInflight() { +return new HashSet<>(StringUtils.split( +getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), +CONFIG_VALUES_DELIMITER)); } public Set getMetadataPartitions() { return new HashSet<>( -StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), -CONFIG_VALUES_DELIMITER)); +StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), +CONFIG_VALUES_DELIMITER)); + } + + /** + * @returns true if metadata table has been created and is being used for this dataset, else returns false. + */ + public boolean isMetadataTableEnabled() { +return isMetadataPartitionEnabled(MetadataPartitionType.FILES); + } + + /** + * Checks if metadata table is enabled and the specified partition has been initialized. + * + * @param partition The partition to check + * @returns true if the specific partition has been initialized, else returns false. + */ + public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) { +return getMetadataPartitions().contains(partition.getPartitionPath()); + } + + /** + * Enables or disables the specified metadata table partition. + * + * @param partition The partition + * @param enabled If true, the partition is enabled, else disabled + */ + public void setMetadataPartitionState(MetadataPartitionType partition, boolean enabled) { + ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER), +"Metadata Table partition path cannot contain a comma: " + partition.getPartitionPath()); +Set partitions = getMetadataPartitions(); +Set partitionsInflight = getMetadataPartitionsInflight(); +if (enabled) { + partitions.add(partition.getPartitionPath()); + partitionsInflight.remove(partition.getPartitionPath()); +} else if (partition.equals(MetadataPartitionType.FILES)) { + // file listing partition is required for all other partitions to work + // Disabling file partition will also disable all partitions + partitions.clear(); + partitionsInflight.clear(); +} else { + partitions.remove(partition.getPartitionPath()); + partitionsInflight.remove(partition.getPartitionPath()); +} +setValue(TABLE_METADATA_PARTITIONS, partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); +setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); + } + + /** + * Enables the specified metadata table partition as inflight. + * + * @param partition The list of partitions to enable as inflight. + */ + public void setMetadataPartitionsAsInflight(List partitionTypes) { +Set partitions = getMetadataPartitionsInflight(); Review Comment: Renamed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192137497 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import scala.Tuple2; + +/** + * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata + * table partition when a very large number of records are inserted. + * + * This partitioner requires the records to be already tagged with location. + */ +public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner> { + final int numPartitions; + public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) { +this.numPartitions = numPartitions; + } + + private class FileGroupPartitioner extends Partitioner { + +@Override +public int getPartition(Object key) { + return ((Tuple2)key)._1; +} + +@Override +public int numPartitions() { + return numPartitions; +} + } + + // FileIDs for the various partitions + private List fileIDPfxs; + + /** + * Partition the records by their location. The number of partitions is determined by the number of MDT fileGroups being udpated rather than the + * specific value of outputSparkPartitions. + */ + @Override + public JavaRDD repartitionRecords(JavaRDD records, int outputSparkPartitions) { +Comparator> keyComparator = +(Comparator> & Serializable)(t1, t2) -> t1._2.compareTo(t2._2); + +// Partition the records by their file group +JavaRDD partitionedRDD = records +// key by . The file group index is used to partition and the record key is used to sort within the partition. +.keyBy(r -> { + int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId()); + return new Tuple2<>(fileGroupIndex, r.getRecordKey()); +}) +.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator) +.map(t -> t._2); + +fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> { + // Due to partitioning, all record in the partition should have same fileID. So we only can get the fileID prefix from the first record. + List fileIds = new ArrayList<>(1); + if (recordItr.hasNext()) { +HoodieRecord record = recordItr.next(); +final String fileID = HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId()); +fileIds.add(fileID); + } else { +// Empty partition +fileIds.add(""); Review Comment: Added doc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org