[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1227895765 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import scala.Tuple2; + +/** + * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata + * table partition when a very large number of records are inserted. + * + * This partitioner requires the records to be already tagged with location. + */ +public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner> { + final int numPartitions; + public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) { +this.numPartitions = numPartitions; + } + + private class FileGroupPartitioner extends Partitioner { + +@Override +public int getPartition(Object key) { + return ((Tuple2)key)._1; +} + +@Override +public int numPartitions() { + return numPartitions; +} + } + + // FileIDs for the various partitions + private List fileIDPfxs; + + /** + * Partition the records by their location. The number of partitions is determined by the number of MDT fileGroups being udpated rather than the + * specific value of outputSparkPartitions. + */ + @Override + public JavaRDD repartitionRecords(JavaRDD records, int outputSparkPartitions) { +Comparator> keyComparator = +(Comparator> & Serializable)(t1, t2) -> t1._2.compareTo(t2._2); + +// Partition the records by their file group +JavaRDD partitionedRDD = records +// key by . The file group index is used to partition and the record key is used to sort within the partition. +.keyBy(r -> { + int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId()); + return new Tuple2<>(fileGroupIndex, r.getRecordKey()); +}) +.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator) +.map(t -> t._2); + +fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> { + // Due to partitioning, all record in the partition should have same fileID. So we only can get the fileID prefix from the first record. + List fileIds = new ArrayList<>(1); + if (recordItr.hasNext()) { +HoodieRecord record = recordItr.next(); +final String fileID = HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId()); +fileIds.add(fileID); + } else { +// FileGroupPartitioner returns a fixed number of partition as part of numPartitions(). In the special case that recordsRDD has fewer +// records than fileGroupCount, some of these partitions (corresponding to fileGroups) will not have any data. +// But we still need to return a fileID for use within {@code BulkInsertMapFunction} +fileIds.add(""); + } + return fileIds.iterator(); +}, true).collect(); +ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == fileIDPfxs.size(), +String.format("Generated fileIDPfxs (%d) are lesser in size than the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions())); + +return partitionedRDD; + } + + @Override + public boolean arePartitionRecordsSorted() { +return true; + } + + @Override + public String getFileIdPfx(int partitionId) { +return fileIDPfxs.get(partitionId); Review Comment: > automatic estimation of the shard counts for each partition, that can be enhanced This may be a solution if we can make accurate estimation of the file group size. -- This is an autom
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1225702442 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import scala.Tuple2; + +/** + * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata + * table partition when a very large number of records are inserted. + * + * This partitioner requires the records to be already tagged with location. + */ +public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner> { + final int numPartitions; + public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) { +this.numPartitions = numPartitions; + } + + private class FileGroupPartitioner extends Partitioner { + +@Override +public int getPartition(Object key) { + return ((Tuple2)key)._1; +} + +@Override +public int numPartitions() { + return numPartitions; +} + } + + // FileIDs for the various partitions + private List fileIDPfxs; + + /** + * Partition the records by their location. The number of partitions is determined by the number of MDT fileGroups being udpated rather than the + * specific value of outputSparkPartitions. + */ + @Override + public JavaRDD repartitionRecords(JavaRDD records, int outputSparkPartitions) { +Comparator> keyComparator = +(Comparator> & Serializable)(t1, t2) -> t1._2.compareTo(t2._2); + +// Partition the records by their file group +JavaRDD partitionedRDD = records +// key by . The file group index is used to partition and the record key is used to sort within the partition. +.keyBy(r -> { + int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId()); + return new Tuple2<>(fileGroupIndex, r.getRecordKey()); +}) +.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator) +.map(t -> t._2); + +fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> { + // Due to partitioning, all record in the partition should have same fileID. So we only can get the fileID prefix from the first record. + List fileIds = new ArrayList<>(1); + if (recordItr.hasNext()) { +HoodieRecord record = recordItr.next(); +final String fileID = HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId()); +fileIds.add(fileID); + } else { +// FileGroupPartitioner returns a fixed number of partition as part of numPartitions(). In the special case that recordsRDD has fewer +// records than fileGroupCount, some of these partitions (corresponding to fileGroups) will not have any data. +// But we still need to return a fileID for use within {@code BulkInsertMapFunction} +fileIds.add(""); + } + return fileIds.iterator(); +}, true).collect(); +ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == fileIDPfxs.size(), +String.format("Generated fileIDPfxs (%d) are lesser in size than the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions())); + +return partitionedRDD; + } + + @Override + public boolean arePartitionRecordsSorted() { +return true; + } + + @Override + public String getFileIdPfx(int partitionId) { +return fileIDPfxs.get(partitionId); Review Comment: No one can have awareness the file group number is pertinent with the correctness. It is a bug, not a usability issue. -- This is an automated message from the Apache Git Service. To respond t
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1225695785 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; + +import scala.Tuple2; + +/** + * A {@code BulkInsertPartitioner} implementation for Metadata Table to improve performance of initialization of metadata + * table partition when a very large number of records are inserted. + * + * This partitioner requires the records to be already tagged with location. + */ +public class SparkHoodieMetadataBulkInsertPartitioner implements BulkInsertPartitioner> { + final int numPartitions; + public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) { +this.numPartitions = numPartitions; + } + + private class FileGroupPartitioner extends Partitioner { + +@Override +public int getPartition(Object key) { + return ((Tuple2)key)._1; +} + +@Override +public int numPartitions() { + return numPartitions; +} + } + + // FileIDs for the various partitions + private List fileIDPfxs; + + /** + * Partition the records by their location. The number of partitions is determined by the number of MDT fileGroups being udpated rather than the + * specific value of outputSparkPartitions. + */ + @Override + public JavaRDD repartitionRecords(JavaRDD records, int outputSparkPartitions) { +Comparator> keyComparator = +(Comparator> & Serializable)(t1, t2) -> t1._2.compareTo(t2._2); + +// Partition the records by their file group +JavaRDD partitionedRDD = records +// key by . The file group index is used to partition and the record key is used to sort within the partition. +.keyBy(r -> { + int fileGroupIndex = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId()); + return new Tuple2<>(fileGroupIndex, r.getRecordKey()); +}) +.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), keyComparator) +.map(t -> t._2); + +fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> { + // Due to partitioning, all record in the partition should have same fileID. So we only can get the fileID prefix from the first record. + List fileIds = new ArrayList<>(1); + if (recordItr.hasNext()) { +HoodieRecord record = recordItr.next(); +final String fileID = HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId()); +fileIds.add(fileID); + } else { +// FileGroupPartitioner returns a fixed number of partition as part of numPartitions(). In the special case that recordsRDD has fewer +// records than fileGroupCount, some of these partitions (corresponding to fileGroups) will not have any data. +// But we still need to return a fileID for use within {@code BulkInsertMapFunction} +fileIds.add(""); + } + return fileIds.iterator(); +}, true).collect(); +ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == fileIDPfxs.size(), +String.format("Generated fileIDPfxs (%d) are lesser in size than the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions())); + +return partitionedRDD; + } + + @Override + public boolean arePartitionRecordsSorted() { +return true; + } + + @Override + public String getFileIdPfx(int partitionId) { +return fileIDPfxs.get(partitionId); Review Comment: This may not be right when one Hfile in one file group is too large to write with, there is a upper threshold for each base file handle, `HoodieAvroHFileWrite#canWrite` may return false, then the
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1223719557 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -1052,51 +1091,81 @@ protected HoodieData prepRecords(Map + * Don't perform optimization if there are inflight operations on the dataset. This is for two reasons: + * - The compaction will contain the correct data as all failed operations have been rolled back. + * - Clean/compaction etc. will have the highest timestamp on the MDT and we won't be adding new operations + * with smaller timestamps to metadata table (makes for easier debugging) + * + * This adds the limitations that long-running async operations (clustering, etc.) may cause delay in such MDT + * optimizations. We will relax this after MDT code has been hardened. */ - protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) { -// finish off any pending compactions if any from previous attempt. -writeClient.runAnyPendingCompactions(); - -String latestDeltaCommitTimeInMetadataTable = metadataMetaClient.reloadActiveTimeline() -.getDeltaCommitTimeline() -.filterCompletedInstants() -.lastInstant().orElseThrow(() -> new HoodieMetadataException("No completed deltacommit in metadata table")) -.getTimestamp(); -// we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table. -// Whenever you want to change this logic, please ensure all below scenarios are considered. -// a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed -// b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, latest compaction instant time in MDT represents -// any instants before that is already synced with metadata table. -// c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every -// instant before c4 is synced with metadata table. -List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() - .findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants(); + @Override + public void performTableServices(Option inFlightInstantTimestamp) { +HoodieTimer metadataTableServicesTimer = HoodieTimer.start(); +boolean allTableServicesExecutedSuccessfullyOrSkipped = true; +try { + BaseHoodieWriteClient writeClient = getWriteClient(); + // Run any pending table services operations. + runPendingTableServicesOperations(writeClient); + + // Check and run clean operations. + String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline() + .filterCompletedInstants() + .lastInstant().get() + .getTimestamp(); + LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + ", running clean operations."); + cleanIfNecessary(writeClient, latestDeltacommitTime); + + // Do timeline validation before scheduling compaction/logcompaction operations. + if (!validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, latestDeltacommitTime)) { +return; Review Comment: > unless compaction in MDT kicks in, archival might not have anything to do after last time it was able to archive something. Then archiving will always be blocked by the compaction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1222772904 ## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java: ## @@ -104,40 +106,19 @@ protected void initRegistry() { } @Override - protected void initialize(HoodieEngineContext engineContext, - Option actionMetadata, - Option inflightInstantTimestamp) { -try { - if (enabled) { -initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); - } -} catch (IOException e) { - LOG.error("Failed to initialize metadata table. Disabling the writer.", e); - enabled = false; -} - } - - @Override - protected void commit(String instantTime, Map> partitionRecordsMap, -boolean canTriggerTableService) { -ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); + protected void commit(String instantTime, Map> partitionRecordsMap) { ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet."); HoodieData preppedRecords = prepRecords(partitionRecordsMap); List preppedRecordList = preppedRecords.collectAsList(); -try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) { +// Flink engine does not optimize initialCommit to MDT as bulk insert is not yet supported + +try (HoodieFlinkWriteClient writeClient = (HoodieFlinkWriteClient) getWriteClient()) { // rollback partially failed writes if any. if (writeClient.rollbackFailedWrites()) { metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient); } - if (canTriggerTableService) { -// trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table, -// we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata -// table (since reader will filter out only from delta commits) -compactIfNecessary(writeClient, instantTime); - } Review Comment: You are right, we should trigger the metadata table compaction on each commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1222548105 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -1453,7 +1453,11 @@ public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMeta * @return The fileID */ public static String getFileIDForFileGroup(MetadataPartitionType partitionType, int index) { -return String.format("%s%04d", partitionType.getFileIdPrefix(), index); +if (partitionType == MetadataPartitionType.FILES) { + return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), index, 0); +} else { Review Comment: Should we fix all the partitions or just the `FILES` partition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1222533305 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -1453,7 +1453,11 @@ public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMeta * @return The fileID */ public static String getFileIDForFileGroup(MetadataPartitionType partitionType, int index) { -return String.format("%s%04d", partitionType.getFileIdPrefix(), index); +if (partitionType == MetadataPartitionType.FILES) { + return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), index, 0); +} else { Review Comment: Discuss offline, this is a fix for consistency of base file fileGroup id and the log files, because the bulk_insert create handle would append a suffix (starts from `-0`) for each file it creates. ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -1453,7 +1453,11 @@ public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMeta * @return The fileID */ public static String getFileIDForFileGroup(MetadataPartitionType partitionType, int index) { -return String.format("%s%04d", partitionType.getFileIdPrefix(), index); +if (partitionType == MetadataPartitionType.FILES) { + return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), index, 0); +} else { Review Comment: Discussed offline, this is a fix for consistency of base file fileGroup id and the log files, because the bulk_insert create handle would append a suffix (starts from `-0`) for each file it creates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1220774838 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -1453,7 +1453,11 @@ public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMeta * @return The fileID */ public static String getFileIDForFileGroup(MetadataPartitionType partitionType, int index) { -return String.format("%s%04d", partitionType.getFileIdPrefix(), index); +if (partitionType == MetadataPartitionType.FILES) { + return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), index, 0); +} else { Review Comment: What is the purpos of extra suffix `-0` for `FILES` file group? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203914496 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -338,14 +338,20 @@ protected void initMetadataTable(Option instantTime) { * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization */ private void initializeMetadataTable(Option inFlightInstantTimestamp) { -if (config.isMetadataTableEnabled()) { - HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, Option.empty(), inFlightInstantTimestamp); - try { -writer.close(); - } catch (Exception e) { -throw new HoodieException("Failed to instantiate Metadata table ", e); +if (!config.isMetadataTableEnabled()) { + LOG.error("11"); + return; +} + +try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, +context, Option.empty(), inFlightInstantTimestamp)) { + if (writer.isInitialized()) { +writer.performTableServices(inFlightInstantTimestamp); + } else { +throw new HoodieException((".22")); Review Comment: Ditto: a meaningless exception msg. ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -626,8 +657,7 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), metadataPartition.getPartitionPath())) -.withFileId(fileGroupFileId) -.overBaseCommit(instantTime) +.withFileId(fileGroupFileId).overBaseCommit(instantTime) .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION) Review Comment: Unnecessary change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203913275 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -338,14 +338,20 @@ protected void initMetadataTable(Option instantTime) { * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization */ private void initializeMetadataTable(Option inFlightInstantTimestamp) { -if (config.isMetadataTableEnabled()) { - HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, Option.empty(), inFlightInstantTimestamp); - try { -writer.close(); - } catch (Exception e) { -throw new HoodieException("Failed to instantiate Metadata table ", e); +if (!config.isMetadataTableEnabled()) { + LOG.error("11"); Review Comment: Why a meaningless log? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1203785303 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -564,53 +532,147 @@ private boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List partitionsToInit, Option inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } -String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - -initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); -initTableMetadata(); -// if async metadata indexing is enabled, -// then only initialize files partition as other partitions will be built using HoodieIndexer -List enabledPartitionTypes = new ArrayList<>(); -if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); -} else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; +// FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) +|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + +metadataMetaClient = initializeMetaClient(); + +// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT +boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); +List partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); +Map> partitionToFilesMap = partitionInfoList.stream() Review Comment: Thanks for the clarification, makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194736640 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -337,15 +337,33 @@ protected void initMetadataTable(Option instantTime) { * * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization */ - private void initializeMetadataTable(Option inFlightInstantTimestamp) { -if (config.isMetadataTableEnabled()) { - HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, Option.empty(), inFlightInstantTimestamp); - try { -writer.close(); - } catch (Exception e) { -throw new HoodieException("Failed to instantiate Metadata table ", e); + private void initializeMetadataTable(WriteOperationType operationType, Option inFlightInstantTimestamp) { +if (!config.isMetadataTableEnabled()) { + return; +} + +try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, +context, Option.empty(), inFlightInstantTimestamp)) { + if (writer.isInitialized()) { +// Optimize the metadata table which involves compacton. cleaning, etc. This should only be called from writers. +switch (operationType) { + case INSERT: + case INSERT_PREPPED: + case UPSERT: + case UPSERT_PREPPED: + case BULK_INSERT: + case BULK_INSERT_PREPPED: + case DELETE: Review Comment: Enuming the write operation is really hard to maintain, can we trigger the table service whatever the operation is ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194731645 ## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ## @@ -694,17 +695,75 @@ private Long getTableChecksum() { return getLong(TABLE_CHECKSUM); } - public List getMetadataPartitionsInflight() { -return StringUtils.split( -getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), -CONFIG_VALUES_DELIMITER -); + public Set getMetadataPartitionsInflight() { +return new HashSet<>(StringUtils.split( +getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING), +CONFIG_VALUES_DELIMITER)); } public Set getMetadataPartitions() { return new HashSet<>( -StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), -CONFIG_VALUES_DELIMITER)); +StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING), +CONFIG_VALUES_DELIMITER)); + } + + /** + * @returns true if metadata table has been created and is being used for this dataset, else returns false. + */ + public boolean isMetadataTableEnabled() { +return isMetadataPartitionEnabled(MetadataPartitionType.FILES); + } + + /** + * Checks if metadata table is enabled and the specified partition has been initialized. + * + * @param partition The partition to check + * @returns true if the specific partition has been initialized, else returns false. + */ + public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) { +return getMetadataPartitions().contains(partition.getPartitionPath()); + } + + /** + * Enables or disables the specified metadata table partition. + * + * @param partition The partition + * @param enabled If true, the partition is enabled, else disabled + */ + public void setMetadataPartitionState(MetadataPartitionType partition, boolean enabled) { + ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER), +"Metadata Table partition path cannot contain a comma: " + partition.getPartitionPath()); +Set partitions = getMetadataPartitions(); +Set partitionsInflight = getMetadataPartitionsInflight(); +if (enabled) { + partitions.add(partition.getPartitionPath()); + partitionsInflight.remove(partition.getPartitionPath()); +} else if (partition.equals(MetadataPartitionType.FILES)) { + // file listing partition is required for all other partitions to work + // Disabling file partition will also disable all partitions + partitions.clear(); + partitionsInflight.clear(); +} else { + partitions.remove(partition.getPartitionPath()); + partitionsInflight.remove(partition.getPartitionPath()); +} +setValue(TABLE_METADATA_PARTITIONS, partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); +setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); Review Comment: Do we need to persist these options? ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java: ## @@ -337,15 +337,33 @@ protected void initMetadataTable(Option instantTime) { * * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization */ - private void initializeMetadataTable(Option inFlightInstantTimestamp) { -if (config.isMetadataTableEnabled()) { - HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, Option.empty(), inFlightInstantTimestamp); - try { -writer.close(); - } catch (Exception e) { -throw new HoodieException("Failed to instantiate Metadata table ", e); + private void initializeMetadataTable(WriteOperationType operationType, Option inFlightInstantTimestamp) { +if (!config.isMetadataTableEnabled()) { + return; +} + +try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, +context, Option.empty(), inFlightInstantTimestamp)) { + if (writer.isInitialized()) { +// Optimize the metadata table which involves compacton. cleaning, etc. This should only be called from writers. +switch (operationType) { + case INSERT: + case INSERT_PREPPED: + case UPSERT: + case UPSERT_PREPPED: + case BULK_INSERT: + case BULK_INSERT_PREPPED: + case DELETE: Review Comment: Enum the write operation is really hard to maintain, can we triggers the table sercive whatever the operation is ? ## hudi-client/hudi-client-co
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192914114 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -824,25 +863,22 @@ private interface ConvertMetadataFunction { /** * Processes commit metadata from data table and commits to metadata table. * - * @param instantTime instant time of interest. + * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. - * @param type of commit metadata. - * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { -if (!dataWriteConfig.isMetadataTableEnabled()) { - return; -} + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { +ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled()); + Set partitionsToUpdate = getMetadataPartitionsToUpdate(); Set inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig()); // if indexing is inflight then do not trigger table service boolean doNotTriggerTableService = partitionsToUpdate.stream().anyMatch(inflightIndexes::contains); Review Comment: I mean `doNotTriggerTableService`, it seems not be used in anywhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192914114 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -824,25 +863,22 @@ private interface ConvertMetadataFunction { /** * Processes commit metadata from data table and commits to metadata table. * - * @param instantTime instant time of interest. + * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. - * @param type of commit metadata. - * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { -if (!dataWriteConfig.isMetadataTableEnabled()) { - return; -} + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { +ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled()); + Set partitionsToUpdate = getMetadataPartitionsToUpdate(); Set inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig()); // if indexing is inflight then do not trigger table service boolean doNotTriggerTableService = partitionsToUpdate.stream().anyMatch(inflightIndexes::contains); Review Comment: I mean `doNotTriggerTableService`, it seems not used in anywhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192913868 ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java: ## @@ -118,46 +123,32 @@ protected void initRegistry() { } @Override - protected void initialize(HoodieEngineContext engineContext, - Option actionMetadata, - Option inflightInstantTimestamp) { -try { - metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> { -if (registry instanceof DistributedRegistry) { - HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; - ((DistributedRegistry) registry).register(sparkEngineContext.getJavaSparkContext()); -} - }); + protected void commit(String instantTime, Map> partitionRecordsMap) { +commitInternal(instantTime, partitionRecordsMap, Option.empty()); + } - if (enabled) { -initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); - } -} catch (IOException e) { - LOG.error("Failed to initialize metadata table. Disabling the writer.", e); - enabled = false; -} + protected void bulkCommit( + String instantTime, MetadataPartitionType partitionType, HoodieData records, + int fileGroupCount) { +Map> partitionRecordsMap = new HashMap<>(); +partitionRecordsMap.put(partitionType, records); +SparkHoodieMetadataBulkInsertPartitioner partitioner = new SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount); Review Comment: You are right, thanks for the clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.
danny0405 commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1190914223 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -619,41 +680,34 @@ private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti return false; } - private void updateInitializedPartitionsInTableConfig(List partitionTypes) { -Set completedPartitions = dataMetaClient.getTableConfig().getMetadataPartitions(); - completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet())); - dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions)); -HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps()); - } - - private HoodieTableMetaClient initializeMetaClient(boolean populateMetaFields) throws IOException { + private HoodieTableMetaClient initializeMetaClient() throws IOException { return HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.MERGE_ON_READ) .setTableName(tableName) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPayloadClassName(HoodieMetadataPayload.class.getName()) .setBaseFileFormat(HoodieFileFormat.HFILE.toString()) .setRecordKeyFields(RECORD_KEY_FIELD_NAME) -.setPopulateMetaFields(populateMetaFields) - .setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) -.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); +.setPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS) + .setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) +.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); Review Comment: Fix the indentation. ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -824,25 +863,22 @@ private interface ConvertMetadataFunction { /** * Processes commit metadata from data table and commits to metadata table. * - * @param instantTime instant time of interest. + * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. - * @param type of commit metadata. - * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { -if (!dataWriteConfig.isMetadataTableEnabled()) { - return; -} + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { +ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled()); + Set partitionsToUpdate = getMetadataPartitionsToUpdate(); Set inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig()); // if indexing is inflight then do not trigger table service boolean doNotTriggerTableService = partitionsToUpdate.stream().anyMatch(inflightIndexes::contains); Review Comment: Do we still need this? ## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java: ## @@ -118,46 +123,32 @@ protected void initRegistry() { } @Override - protected void initialize(HoodieEngineContext engineContext, - Option actionMetadata, - Option inflightInstantTimestamp) { -try { - metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> { -if (registry instanceof DistributedRegistry) { - HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) engineContext; - ((DistributedRegistry) registry).register(sparkEngineContext.getJavaSparkContext()); -} - }); + protected void commit(String instantTime, Map> partitionRecordsMap) { +commitInternal(instantTime, partitionRecordsMap, Option.empty()); + } - if (enabled) { -initializeIfNeeded(dataMetaClient, actionMetadata, inflightInstantTimestamp); - } -} catch (IOException e) { - LOG.error("Failed to initialize metadata table. Disabling the writer.", e); - enabled = false; -} + protected void bulkCommit( + String instantTime, MetadataPartitionType partitionType, HoodieData records, + int fileGroupCount) { +Map> partitionRecordsMap = new HashMap<>(); +partitionRecordsMap.put(partitionType, records)