[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1227127259


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to be already tagged with location.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner> {
+  final int numPartitions;
+  public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) {
+this.numPartitions = numPartitions;
+  }
+
+  private class FileGroupPartitioner extends Partitioner {
+
+@Override
+public int getPartition(Object key) {
+  return ((Tuple2)key)._1;
+}
+
+@Override
+public int numPartitions() {
+  return numPartitions;
+}
+  }
+
+  // FileIDs for the various partitions
+  private List fileIDPfxs;
+
+  /**
+   * Partition the records by their location. The number of partitions is 
determined by the number of MDT fileGroups being udpated rather than the
+   * specific value of outputSparkPartitions.
+   */
+  @Override
+  public JavaRDD repartitionRecords(JavaRDD 
records, int outputSparkPartitions) {
+Comparator> keyComparator =
+(Comparator> & Serializable)(t1, t2) -> 
t1._2.compareTo(t2._2);
+
+// Partition the records by their file group
+JavaRDD partitionedRDD = records
+// key by . The file group index is 
used to partition and the record key is used to sort within the partition.
+.keyBy(r -> {
+  int fileGroupIndex = 
HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId());
+  return new Tuple2<>(fileGroupIndex, r.getRecordKey());
+})
+.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), 
keyComparator)
+.map(t -> t._2);
+
+fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+  // Due to partitioning, all record in the partition should have same 
fileID. So we only can get the fileID prefix from the first record.
+  List fileIds = new ArrayList<>(1);
+  if (recordItr.hasNext()) {
+HoodieRecord record = recordItr.next();
+final String fileID = 
HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId());
+fileIds.add(fileID);
+  } else {
+// FileGroupPartitioner returns a fixed number of partition as part of 
numPartitions(). In the special case that recordsRDD has fewer
+// records than fileGroupCount, some of these partitions 
(corresponding to fileGroups) will not have any data.
+// But we still need to return a fileID for use within {@code 
BulkInsertMapFunction}
+fileIds.add("");
+  }
+  return fileIds.iterator();
+}, true).collect();
+ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == 
fileIDPfxs.size(),
+String.format("Generated fileIDPfxs (%d) are lesser in size than 
the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions()));
+
+return partitionedRDD;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+return true;
+  }
+
+  @Override
+  public String getFileIdPfx(int partitionId) {
+return fileIDPfxs.get(partitionId);

Review Comment:
   This PR also adds support for automatic estimation of the shard counts for 
each partition, that can be enhanced.



-- 
This is an automated message from the Apache Git Service.
To respond to t

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1227126602


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to be already tagged with location.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner> {
+  final int numPartitions;
+  public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) {
+this.numPartitions = numPartitions;
+  }
+
+  private class FileGroupPartitioner extends Partitioner {
+
+@Override
+public int getPartition(Object key) {
+  return ((Tuple2)key)._1;
+}
+
+@Override
+public int numPartitions() {
+  return numPartitions;
+}
+  }
+
+  // FileIDs for the various partitions
+  private List fileIDPfxs;
+
+  /**
+   * Partition the records by their location. The number of partitions is 
determined by the number of MDT fileGroups being udpated rather than the
+   * specific value of outputSparkPartitions.
+   */
+  @Override
+  public JavaRDD repartitionRecords(JavaRDD 
records, int outputSparkPartitions) {
+Comparator> keyComparator =
+(Comparator> & Serializable)(t1, t2) -> 
t1._2.compareTo(t2._2);
+
+// Partition the records by their file group
+JavaRDD partitionedRDD = records
+// key by . The file group index is 
used to partition and the record key is used to sort within the partition.
+.keyBy(r -> {
+  int fileGroupIndex = 
HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId());
+  return new Tuple2<>(fileGroupIndex, r.getRecordKey());
+})
+.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), 
keyComparator)
+.map(t -> t._2);
+
+fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+  // Due to partitioning, all record in the partition should have same 
fileID. So we only can get the fileID prefix from the first record.
+  List fileIds = new ArrayList<>(1);
+  if (recordItr.hasNext()) {
+HoodieRecord record = recordItr.next();
+final String fileID = 
HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId());
+fileIds.add(fileID);
+  } else {
+// FileGroupPartitioner returns a fixed number of partition as part of 
numPartitions(). In the special case that recordsRDD has fewer
+// records than fileGroupCount, some of these partitions 
(corresponding to fileGroups) will not have any data.
+// But we still need to return a fileID for use within {@code 
BulkInsertMapFunction}
+fileIds.add("");
+  }
+  return fileIds.iterator();
+}, true).collect();
+ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == 
fileIDPfxs.size(),
+String.format("Generated fileIDPfxs (%d) are lesser in size than 
the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions()));
+
+return partitionedRDD;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+return true;
+  }
+
+  @Override
+  public String getFileIdPfx(int partitionId) {
+return fileIDPfxs.get(partitionId);

Review Comment:
   The idea behind sharding in MDT is that you can create more shards rather 
than splitting a shard into two files. Spliiting will cause one large and one 
small size file which is not optimal.
  

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-06 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1219045032


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -626,8 +657,7 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
 
 HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
 
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), 
metadataPartition.getPartitionPath()))
-.withFileId(fileGroupFileId)
-.overBaseCommit(instantTime)
+.withFileId(fileGroupFileId).overBaseCommit(instantTime)
 .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)

Review Comment:
   Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-06 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1219043175


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -338,14 +338,20 @@ protected void initMetadataTable(Option 
instantTime) {
* @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
*/
   private void initializeMetadataTable(Option 
inFlightInstantTimestamp) {
-if (config.isMetadataTableEnabled()) {
-  HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-  context, Option.empty(), inFlightInstantTimestamp);
-  try {
-writer.close();
-  } catch (Exception e) {
-throw new HoodieException("Failed to instantiate Metadata table ", e);
+if (!config.isMetadataTableEnabled()) {
+  LOG.error("11");
+  return;
+}
+
+try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
+context, Option.empty(), inFlightInstantTimestamp)) {
+  if (writer.isInitialized()) {
+writer.performTableServices(inFlightInstantTimestamp);
+  } else {
+throw new HoodieException((".22"));

Review Comment:
   Removed the debug log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-06 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1219042633


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -338,14 +338,20 @@ protected void initMetadataTable(Option 
instantTime) {
* @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
*/
   private void initializeMetadataTable(Option 
inFlightInstantTimestamp) {
-if (config.isMetadataTableEnabled()) {
-  HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-  context, Option.empty(), inFlightInstantTimestamp);
-  try {
-writer.close();
-  } catch (Exception e) {
-throw new HoodieException("Failed to instantiate Metadata table ", e);
+if (!config.isMetadataTableEnabled()) {
+  LOG.error("11");

Review Comment:
   Removed the debug log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-23 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203532474


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -1052,51 +1091,81 @@ protected HoodieData 
prepRecords(Map
+   * Don't perform optimization if there are inflight operations on the 
dataset. This is for two reasons:
+   * - The compaction will contain the correct data as all failed operations 
have been rolled back.
+   * - Clean/compaction etc. will have the highest timestamp on the MDT and we 
won't be adding new operations
+   * with smaller timestamps to metadata table (makes for easier debugging)
+   * 
+   * This adds the limitations that long-running async operations (clustering, 
etc.) may cause delay in such MDT
+   * optimizations. We will relax this after MDT code has been hardened.
*/
-  protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String 
instantTime) {
-// finish off any pending compactions if any from previous attempt.
-writeClient.runAnyPendingCompactions();
-
-String latestDeltaCommitTimeInMetadataTable = 
metadataMetaClient.reloadActiveTimeline()
-.getDeltaCommitTimeline()
-.filterCompletedInstants()
-.lastInstant().orElseThrow(() -> new HoodieMetadataException("No 
completed deltacommit in metadata table"))
-.getTimestamp();
-// we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
-// Whenever you want to change this logic, please ensure all below 
scenarios are considered.
-// a. There could be a chance that latest delta commit in MDT is committed 
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
-// b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, latest compaction instant time in MDT 
represents
-// any instants before that is already synced with metadata table.
-// c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
-// instant before c4 is synced with metadata table.
-List pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
-
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+  @Override
+  public void performTableServices(Option inFlightInstantTimestamp) {
+HoodieTimer metadataTableServicesTimer = HoodieTimer.start();
+boolean allTableServicesExecutedSuccessfullyOrSkipped = true;
+try {
+  BaseHoodieWriteClient writeClient = getWriteClient();
+  // Run any pending table services operations.
+  runPendingTableServicesOperations(writeClient);
+
+  // Check and run clean operations.
+  String latestDeltacommitTime = 
metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline()
+  .filterCompletedInstants()
+  .lastInstant().get()
+  .getTimestamp();
+  LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + 
", running clean operations.");
+  cleanIfNecessary(writeClient, latestDeltacommitTime);
+
+  // Do timeline validation before scheduling compaction/logcompaction 
operations.
+  if 
(!validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, 
latestDeltacommitTime)) {
+return;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-23 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203530662


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -760,15 +815,14 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
 
 HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
 
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), 
metadataPartition.getPartitionPath()))
-.withFileId(fileGroupFileId)
-.overBaseCommit(instantTime)
+.withFileId(fileGroupFileId).overBaseCommit(instantTime)
 .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
 .withFileSize(0L)
-.withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
-.withFs(dataMetaClient.getFs())
-.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
-.withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
-.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+.withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
+.withFs(dataMetaClient.getFs())
+.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-23 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203530124


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -564,53 +532,147 @@ private  boolean 
isCommitRevertedByInFlightAction(
   /**
* Initialize the Metadata Table by listing files and partitions from the 
file system.
*
-   * @param dataMetaClient   - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime   - Timestamp to use for the commit
+   * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
*/
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List partitionsToInit,
Option 
inflightInstantTimestamp) throws IOException {
 if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
   return false;
 }
 
-String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-initTableMetadata();
-// if async metadata indexing is enabled,
-// then only initialize files partition as other partitions will be built 
using HoodieIndexer
-List enabledPartitionTypes =  new ArrayList<>();
-if (dataWriteConfig.isMetadataAsyncIndex()) {
-  enabledPartitionTypes.add(MetadataPartitionType.FILES);
-} else {
-  // all enabled ones should be initialized
-  enabledPartitionTypes = this.enabledPartitionTypes;
+// FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+metadataMetaClient = initializeMetaClient();
+
+// Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+List partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+Map> partitionToFilesMap = 
partitionInfoList.stream()

Review Comment:
   If the MDT already exists then we are here initializing other partitions. 
For that we would need the list of all files and partitions which we can load 
from MDT files partition itself.
   MDT files partitions is pre-requisite for all other partitions so it should 
either exist or be created first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-23 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203525353


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -375,105 +357,91 @@ public List 
getEnabledPartitionTypes() {
 return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * 
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata   Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   * while deciding to initialize the metadata 
table.
-   */
-  protected abstract  void 
initialize(HoodieEngineContext engineContext,
-Option 
actionMetadata,
-
Option inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-try {
-  if (this.metadata != null) {
-this.metadata.close();
-  }
-  this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-  dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-  this.metadataMetaClient = metadata.getMetadataMetaClient();
-} catch (Exception e) {
-  throw new HoodieException("Error initializing metadata table for reads", 
e);
-}
-  }
-
   /**
* Initialize the metadata table if needed.
*
* @param dataMetaClient   - meta client for the data table
* @param actionMetadata   - optional action metadata
* @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
* @param   - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
*/
-  protected  void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-   Option 
actionMetadata,
-   
Option inflightInstantTimestamp) throws IOException {
+  protected  boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+  
Option actionMetadata,
+  
Option inflightInstantTimestamp) throws IOException {
 HoodieTimer timer = HoodieTimer.start();
+List partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+try {
+  boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+  if (!exists) {
+// FILES partition is always required
+partitionsToInit.add(MetadataPartitionType.FILES);
+  }
 
-if (!exists) {
-  // Initialize for the first time by listing partitions and files 
directly from the file system
-  if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+  // check if any of the enabled partition types needs to be initialized
+  // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+  if (!dataWriteConfig.isMetadataAsyncIndex()) {
+Set inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+this.enabledPartitionTypes.stream()
+.filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+.forEach(partitionsToInit::add);
   }
-  return;
-}
 
-// if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-// NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-if (!dataWriteConfig.isMetadataAsyncIndex()) {
-  Set inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-  LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-  List partitionsToInit = 
this.enabledPartitionTypes.stream()
-  .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-23 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203521583


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -489,7 +457,7 @@ private  boolean 
metadataTableExists(HoodieTableMe
* TODO: Revisit this logic and validate that filtering for all
*   commits timeline is the right thing to do
*
-   * @return True if the initialize is not needed, False otherwise
+   * @return True if the initialization is not needed, False otherwise

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-23 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203518705


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -337,15 +337,33 @@ protected void initMetadataTable(Option 
instantTime) {
*
* @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
*/
-  private void initializeMetadataTable(Option 
inFlightInstantTimestamp) {
-if (config.isMetadataTableEnabled()) {
-  HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-  context, Option.empty(), inFlightInstantTimestamp);
-  try {
-writer.close();
-  } catch (Exception e) {
-throw new HoodieException("Failed to instantiate Metadata table ", e);
+  private void initializeMetadataTable(WriteOperationType operationType, 
Option inFlightInstantTimestamp) {
+if (!config.isMetadataTableEnabled()) {
+  return;
+}
+
+try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
+context, Option.empty(), inFlightInstantTimestamp)) {
+  if (writer.isInitialized()) {
+// Optimize the metadata table which involves compacton. cleaning, 
etc. This should only be called from writers.
+switch (operationType) {
+  case INSERT:
+  case INSERT_PREPPED:
+  case UPSERT:
+  case UPSERT_PREPPED:
+  case BULK_INSERT:
+  case BULK_INSERT_PREPPED:
+  case DELETE:

Review Comment:
   On second thoughts, the switch is not necessary. The above code is within a 
transaction lock so there should not be any conflicts of multiple writers 
optimizing MDT together. The checks within performTableServices should be light 
enough or we can optimize them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-23 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203503606


##
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##
@@ -694,17 +695,75 @@ private Long getTableChecksum() {
 return getLong(TABLE_CHECKSUM);
   }
 
-  public List getMetadataPartitionsInflight() {
-return StringUtils.split(
-getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
-CONFIG_VALUES_DELIMITER
-);
+  public Set getMetadataPartitionsInflight() {
+return new HashSet<>(StringUtils.split(
+getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
+CONFIG_VALUES_DELIMITER));
   }
 
   public Set getMetadataPartitions() {
 return new HashSet<>(
-StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
-CONFIG_VALUES_DELIMITER));
+StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
+CONFIG_VALUES_DELIMITER));
+  }
+
+  /**
+   * @returns true if metadata table has been created and is being used for 
this dataset, else returns false.
+   */
+  public boolean isMetadataTableEnabled() {
+return isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+  }
+
+  /**
+   * Checks if metadata table is enabled and the specified partition has been 
initialized.
+   *
+   * @param partition The partition to check
+   * @returns true if the specific partition has been initialized, else 
returns false.
+   */
+  public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) {
+return getMetadataPartitions().contains(partition.getPartitionPath());
+  }
+
+  /**
+   * Enables or disables the specified metadata table partition.
+   *
+   * @param partition The partition
+   * @param enabled   If true, the partition is enabled, else disabled
+   */
+  public void setMetadataPartitionState(MetadataPartitionType partition, 
boolean enabled) {
+
ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER),
+"Metadata Table partition path cannot contain a comma: " + 
partition.getPartitionPath());
+Set partitions = getMetadataPartitions();
+Set partitionsInflight = getMetadataPartitionsInflight();
+if (enabled) {
+  partitions.add(partition.getPartitionPath());
+  partitionsInflight.remove(partition.getPartitionPath());
+} else if (partition.equals(MetadataPartitionType.FILES)) {
+  // file listing partition is required for all other partitions to work
+  // Disabling file partition will also disable all partitions
+  partitions.clear();
+  partitionsInflight.clear();
+} else {
+  partitions.remove(partition.getPartitionPath());
+  partitionsInflight.remove(partition.getPartitionPath());
+}
+setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));

Review Comment:
   It is persisted from the caller side - 
HoodieTableMetadataUtil.setMetadataPartitionState
   
   I have removed the HoodieTableMetadataUtil.setMetadataPartitionState as it 
was unnecessary. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-15 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194268603


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
 // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
 // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
 // metadata table.
-writeClient.clean(instantTime + "002");
+
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
 writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables.
*/
-  private void initialCommit(String createInstantTime, 
List partitionTypes) {
-// List all partitions in the basePath of the containing dataset
-LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-Map> partitionToRecordsMap 
= new HashMap<>();
-
-// skip file system listing to populate metadata records if it's a fresh 
table.
-// this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-  // If not, last completed commit in data table will be chosen as the 
initial commit time.
-  LOG.info("Triggering empty Commit to metadata to initialize");
-} else {
-  List partitionInfoList = 
listAllPartitions(dataMetaClient);
-  Map> partitionToFilesMap = 
partitionInfoList.stream()
-  .map(p -> {
-String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-return Pair.of(partitionName, p.getFileNameToSizeMap());
-  })
-  .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-  int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-  List partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-  if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-// Record which saves the list of all partitions
-HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-HoodieData filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-  }
-
-  if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-final HoodieData recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-  }
-
-  if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-final HoodieData recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-  }
-  LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+  private boolean validateTimelineBeforeSchedulingCompaction(Option 
inFlightInstantTimestamp, String latestDeltacommitTime) {
+// There should not be any incomplete instants on MDT
+HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
+List pendingInstantsOnMetadataTable = 
metadataTimeline.filterInflightsAndRequested().getInstants();
+if (!pendingInstantsOnMetadataTable.isEmpty()) {
+  LOG.info(String.format(
+  "Cannot compact MDT as there are %d inflight instants: %s",
+  pendingInst

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-15 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194268190


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
 // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
 // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
 // metadata table.
-writeClient.clean(instantTime + "002");
+
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
 writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables.
*/
-  private void initialCommit(String createInstantTime, 
List partitionTypes) {
-// List all partitions in the basePath of the containing dataset
-LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-Map> partitionToRecordsMap 
= new HashMap<>();
-
-// skip file system listing to populate metadata records if it's a fresh 
table.
-// this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-  // If not, last completed commit in data table will be chosen as the 
initial commit time.
-  LOG.info("Triggering empty Commit to metadata to initialize");
-} else {
-  List partitionInfoList = 
listAllPartitions(dataMetaClient);
-  Map> partitionToFilesMap = 
partitionInfoList.stream()
-  .map(p -> {
-String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-return Pair.of(partitionName, p.getFileNameToSizeMap());
-  })
-  .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-  int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-  List partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-  if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-// Record which saves the list of all partitions
-HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-HoodieData filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-  }
-
-  if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-final HoodieData recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-  }
-
-  if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-final HoodieData recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-  }
-  LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+  private boolean validateTimelineBeforeSchedulingCompaction(Option 
inFlightInstantTimestamp, String latestDeltacommitTime) {
+// There should not be any incomplete instants on MDT
+HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
+List pendingInstantsOnMetadataTable = 
metadataTimeline.filterInflightsAndRequested().getInstants();
+if (!pendingInstantsOnMetadataTable.isEmpty()) {
+  LOG.info(String.format(
+  "Cannot compact MDT as there are %d inflight instants: %s",
+  pendingInst

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-15 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194267719


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient 
writeClient, String instan
 // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
 // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
 // metadata table.
-writeClient.clean(instantTime + "002");
+
writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime));
 writeClient.lazyRollbackFailedIndexing();
   }
 
   /**
-   * This is invoked to initialize metadata table for a dataset.
-   * Initial commit has special handling mechanism due to its scale compared 
to other regular commits.
-   * During cold startup, the list of files to be committed can be huge.
-   * So creating a HoodieCommitMetadata out of these large number of files,
-   * and calling the existing update(HoodieCommitMetadata) function does not 
scale well.
-   * Hence, we have a special commit just for the initialization scenario.
+   * Validates the timeline for both main and metadata tables.
*/
-  private void initialCommit(String createInstantTime, 
List partitionTypes) {
-// List all partitions in the basePath of the containing dataset
-LOG.info("Initializing metadata table by using file listings in " + 
dataWriteConfig.getBasePath());
-engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing 
metadata table by listing files and partitions: " + 
dataWriteConfig.getTableName());
-
-Map> partitionToRecordsMap 
= new HashMap<>();
-
-// skip file system listing to populate metadata records if it's a fresh 
table.
-// this is applicable only if the table already has N commits and metadata 
is enabled at a later point in time.
-if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // 
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
-  // If not, last completed commit in data table will be chosen as the 
initial commit time.
-  LOG.info("Triggering empty Commit to metadata to initialize");
-} else {
-  List partitionInfoList = 
listAllPartitions(dataMetaClient);
-  Map> partitionToFilesMap = 
partitionInfoList.stream()
-  .map(p -> {
-String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-return Pair.of(partitionName, p.getFileNameToSizeMap());
-  })
-  .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-  int totalDataFilesCount = 
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-  List partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-  if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-// Record which saves the list of all partitions
-HoodieRecord allPartitionRecord = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
-HoodieData filesPartitionRecords = 
getFilesPartitionRecords(createInstantTime, partitionInfoList, 
allPartitionRecord);
-ValidationUtils.checkState(filesPartitionRecords.count() == 
(partitions.size() + 1));
-partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecords);
-  }
-
-  if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && 
totalDataFilesCount > 0) {
-final HoodieData recordsRDD = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
-partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
recordsRDD);
-  }
-
-  if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && 
totalDataFilesCount > 0) {
-final HoodieData recordsRDD = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
-partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
recordsRDD);
-  }
-  LOG.info("Committing " + partitions.size() + " partitions and " + 
totalDataFilesCount + " files to metadata");
+  private boolean validateTimelineBeforeSchedulingCompaction(Option 
inFlightInstantTimestamp, String latestDeltacommitTime) {
+// There should not be any incomplete instants on MDT
+HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.reloadActiveTimeline();
+List pendingInstantsOnMetadataTable = 
metadataTimeline.filterInflightsAndRequested().getInstants();
+if (!pendingInstantsOnMetadataTable.isEmpty()) {
+  LOG.info(String.format(
+  "Cannot compact MDT as there are %d inflight instants: %s",
+  pendingInst

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-15 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194226408


##
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java:
##
@@ -2482,6 +2483,14 @@ public void testMetadataMetrics() throws Exception {
 }
   }
 
+  @Test
+  public void testGetFileGroupIndexFromFileId() {

Review Comment:
   Added test for first 3. 
   
   getFileIdLengthWithoutFileIndex is private and is called from other 
functions so is indirectly tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-15 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194213033


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to be already tagged with location.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner> {

Review Comment:
   Yes, added TestSparkHoodieMetadataBulkInsertPartitioner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-15 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194209832


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -824,25 +863,22 @@ private interface ConvertMetadataFunction {
   /**
* Processes commit metadata from data table and commits to metadata table.
*
-   * @param instantTime instant time of interest.
+   * @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param  type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
*/
-  private  void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-if (!dataWriteConfig.isMetadataTableEnabled()) {
-  return;
-}
+  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
+ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled());
+
 Set partitionsToUpdate = getMetadataPartitionsToUpdate();
 Set inflightIndexes = 
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
 // if indexing is inflight then do not trigger table service
 boolean doNotTriggerTableService = 
partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-15 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194208983


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -562,53 +532,144 @@ private  boolean 
isCommitRevertedByInFlightAction(
   /**
* Initialize the Metadata Table by listing files and partitions from the 
file system.
*
-   * @param dataMetaClient   - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime   - Timestamp to use for the commit
+   * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
*/
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List partitionsToInit,
Option 
inflightInstantTimestamp) throws IOException {
 if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
   return false;
 }
 
-String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-initTableMetadata();
-// if async metadata indexing is enabled,
-// then only initialize files partition as other partitions will be built 
using HoodieIndexer
-List enabledPartitionTypes =  new ArrayList<>();
-if (dataWriteConfig.isMetadataAsyncIndex()) {
-  enabledPartitionTypes.add(MetadataPartitionType.FILES);
-} else {
-  // all enabled ones should be initialized
-  enabledPartitionTypes = this.enabledPartitionTypes;
+// FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+metadataMetaClient = initializeMetaClient();
+
+// Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+List partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+Map> partitionToFilesMap = 
partitionInfoList.stream()
+.map(p -> {
+  String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+  return Pair.of(partitionName, p.getFileNameToSizeMap());
+})
+.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+for (MetadataPartitionType partitionType : partitionsToInit) {
+  // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+  String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+  LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+  Pair> fileGroupCountAndRecordsPair;
+  switch (partitionType) {
+case FILES:
+  fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+  break;
+case BLOOM_FILTERS:
+  fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+  break;
+case COLUMN_STATS:
+  fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+  break;
+default:
+  throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+  }
+
+  // Generate the file groups
+  final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+  ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+  initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+  // Perform the commit using bulkCommit
+  HoodieData records = 
fileGroupCountAndRecordsPair.getValue();
+  bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+  metadataMetaClient.reloadActiveTimeline();
+  dataMetaClient = 
HoodieTableMetadataUtil.setMetadataPartitionState(dataMetaClient, 
partitionType, true);
 }
-initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-initialCommit(createInstantTime, enabledPartitionTypes);
-updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
 return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-// If there is no

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-15 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194204960


##
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##
@@ -670,17 +670,75 @@ private Long getTableChecksum() {
 return getLong(TABLE_CHECKSUM);
   }
 
-  public List getMetadataPartitionsInflight() {
-return StringUtils.split(
-getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
-CONFIG_VALUES_DELIMITER
-);
+  public Set getMetadataPartitionsInflight() {
+return new HashSet<>(StringUtils.split(
+getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
+CONFIG_VALUES_DELIMITER));
   }
 
   public Set getMetadataPartitions() {
 return new HashSet<>(
-StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
-CONFIG_VALUES_DELIMITER));
+StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
+CONFIG_VALUES_DELIMITER));
+  }
+
+  /**
+   * @returns true if metadata table has been created and is being used for 
this dataset, else returns false.
+   */
+  public boolean isMetadataTableEnabled() {
+return isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+  }
+
+  /**
+   * Checks if metadata table is enabled and the specified partition has been 
initialized.
+   *
+   * @param partition The partition to check
+   * @returns true if the specific partition has been initialized, else 
returns false.
+   */
+  public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) {
+return getMetadataPartitions().contains(partition.getPartitionPath());
+  }
+
+  /**
+   * Enables or disables the specified metadata table partition.
+   *
+   * @param partition The partition
+   * @param enabled   If true, the partition is enabled, else disabled
+   */
+  public void setMetadataPartitionState(MetadataPartitionType partition, 
boolean enabled) {
+
ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER),
+"Metadata Table partition path cannot contain a comma: " + 
partition.getPartitionPath());
+Set partitions = getMetadataPartitions();
+Set partitionsInflight = getMetadataPartitionsInflight();
+if (enabled) {
+  partitions.add(partition.getPartitionPath());
+  partitionsInflight.remove(partition.getPartitionPath());
+} else if (partition.equals(MetadataPartitionType.FILES)) {
+  // file listing partition is required for all other partitions to work
+  // Disabling file partition will also disable all partitions
+  partitions.clear();
+  partitionsInflight.clear();
+} else {
+  partitions.remove(partition.getPartitionPath());
+  partitionsInflight.remove(partition.getPartitionPath());
+}
+setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+  }
+
+  /**
+   * Enables the specified metadata table partition as inflight.
+   *
+   * @param partition The list of partitions to enable as inflight.
+   */
+  public void setMetadataPartitionsAsInflight(List 
partitionTypes) {
+Set partitions = getMetadataPartitionsInflight();

Review Comment:
   Done/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192180200


##
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##
@@ -1378,6 +1339,206 @@ public static Set 
getInflightAndCompletedMetadataPartitions(HoodieTableC
*/
   public static boolean isIndexingCommit(String instantTime) {
 return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-&& instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+&& instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
+
+  /**
+   * Delete the metadata table for the dataset and backup if required.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param contextinstance of {@link HoodieEngineContext}.
+   * @param backup Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *   directory with name metadata_.
+   * @return The backup directory if backup was requested
+   */
+  public static String deleteMetadataTable(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context, boolean backup) {
+final Path metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
+FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), 
context.getHadoopConf().get());
+setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, 
false);
+try {
+  if (!fs.exists(metadataTablePath)) {
+return null;
+  }
+} catch (FileNotFoundException e) {
+  // Ignoring exception as metadata table already does not exist
+  return null;
+} catch (IOException e) {
+  throw new HoodieMetadataException("Failed to check metadata table 
existence", e);
+}
+
+if (backup) {
+  final Path metadataBackupPath = new Path(metadataTablePath.getParent(), 
".metadata_" + HoodieActiveTimeline.createNewInstantTime());
+  LOG.info("Backing up metadata directory to " + metadataBackupPath + " 
before deletion");
+  try {
+if (fs.rename(metadataTablePath, metadataBackupPath)) {
+  return metadataBackupPath.toString();
+}
+  } catch (Exception e) {
+// If rename fails, we will ignore the backup and still delete the MDT
+LOG.error("Failed to backup metadata table using rename", e);
+  }
+}
+
+LOG.info("Deleting metadata table from " + metadataTablePath);
+try {
+  fs.delete(metadataTablePath, true);
+} catch (Exception e) {
+  throw new HoodieMetadataException("Failed to delete metadata table from 
path " + metadataTablePath, e);
+}
+
+return null;
+  }
+
+  public static HoodieTableMetaClient 
setMetadataPartitionState(HoodieTableMetaClient dataMetaClient, 
MetadataPartitionType partitionType, boolean enabled) {
+dataMetaClient.getTableConfig().setMetadataPartitionState(partitionType, 
enabled);
+HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionEnabled(partitionType)
 == enabled,
+"Metadata table state change should be persisted");
+
+LOG.info(String.format("Metadata table %s partition %s has been %s", 
dataMetaClient.getBasePathV2(), partitionType,
+enabled ? "enabled" : "disabled"));
+return dataMetaClient;
+  }
+
+  /**
+   * Delete a partition within the metadata table.
+   * 
+   * This can be used to delete a partition so that it can be re-bootstrapped.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param contextinstance of {@code HoodieEngineContext}.
+   * @param backup Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *   directory with name metadata_.
+   * @param partitionType  The partition to delete
+   * @return The backup directory if backup was requested, null otherwise
+   */
+  public static String deleteMetadataTablePartition(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context,
+MetadataPartitionType 
partitionType, boolean backup) {
+if (partitionType.equals(MetadataPartitionType.FILES)) {
+  return deleteMetadataTable(dataMetaClient, context, backup);
+}
+
+final Path metadataTablePartitionPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()),
 partitionType.getPartitionPath());
+FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), 
context.getHadoopConf().get());
+setMetadataPartitionS

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192169835


##
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##
@@ -1378,6 +1339,206 @@ public static Set 
getInflightAndCompletedMetadataPartitions(HoodieTableC
*/
   public static boolean isIndexingCommit(String instantTime) {
 return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
METADATA_INDEXER_TIME_SUFFIX.length()
-&& instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+&& instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX);
+  }
+
+  /**
+   * Delete the metadata table for the dataset and backup if required.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param contextinstance of {@link HoodieEngineContext}.
+   * @param backup Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *   directory with name metadata_.
+   * @return The backup directory if backup was requested
+   */
+  public static String deleteMetadataTable(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context, boolean backup) {
+final Path metadataTablePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2());
+FileSystem fs = FSUtils.getFs(metadataTablePath.toString(), 
context.getHadoopConf().get());
+setMetadataPartitionState(dataMetaClient, MetadataPartitionType.FILES, 
false);
+try {
+  if (!fs.exists(metadataTablePath)) {
+return null;
+  }
+} catch (FileNotFoundException e) {
+  // Ignoring exception as metadata table already does not exist
+  return null;
+} catch (IOException e) {
+  throw new HoodieMetadataException("Failed to check metadata table 
existence", e);
+}
+
+if (backup) {
+  final Path metadataBackupPath = new Path(metadataTablePath.getParent(), 
".metadata_" + HoodieActiveTimeline.createNewInstantTime());
+  LOG.info("Backing up metadata directory to " + metadataBackupPath + " 
before deletion");
+  try {
+if (fs.rename(metadataTablePath, metadataBackupPath)) {
+  return metadataBackupPath.toString();
+}
+  } catch (Exception e) {
+// If rename fails, we will ignore the backup and still delete the MDT
+LOG.error("Failed to backup metadata table using rename", e);
+  }
+}
+
+LOG.info("Deleting metadata table from " + metadataTablePath);
+try {
+  fs.delete(metadataTablePath, true);
+} catch (Exception e) {
+  throw new HoodieMetadataException("Failed to delete metadata table from 
path " + metadataTablePath, e);
+}
+
+return null;
+  }
+
+  public static HoodieTableMetaClient 
setMetadataPartitionState(HoodieTableMetaClient dataMetaClient, 
MetadataPartitionType partitionType, boolean enabled) {
+dataMetaClient.getTableConfig().setMetadataPartitionState(partitionType, 
enabled);
+HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
+dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient);
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionEnabled(partitionType)
 == enabled,
+"Metadata table state change should be persisted");
+
+LOG.info(String.format("Metadata table %s partition %s has been %s", 
dataMetaClient.getBasePathV2(), partitionType,
+enabled ? "enabled" : "disabled"));
+return dataMetaClient;
+  }
+
+  /**
+   * Delete a partition within the metadata table.
+   * 
+   * This can be used to delete a partition so that it can be re-bootstrapped.
+   *
+   * @param dataMetaClient {@code HoodieTableMetaClient} of the dataset for 
which metadata table is to be deleted
+   * @param contextinstance of {@code HoodieEngineContext}.
+   * @param backup Whether metadata table should be backed up before 
deletion. If true, the table is backed up to the
+   *   directory with name metadata_.
+   * @param partitionType  The partition to delete
+   * @return The backup directory if backup was requested, null otherwise
+   */
+  public static String deleteMetadataTablePartition(HoodieTableMetaClient 
dataMetaClient, HoodieEngineContext context,
+MetadataPartitionType 
partitionType, boolean backup) {
+if (partitionType.equals(MetadataPartitionType.FILES)) {
+  return deleteMetadataTable(dataMetaClient, context, backup);
+}
+
+final Path metadataTablePartitionPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()),
 partitionType.getPartitionPath());
+FileSystem fs = FSUtils.getFs(metadataTablePartitionPath.toString(), 
context.getHadoopConf().get());
+setMetadataPartitionS

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192166826


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -873,17 +908,7 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List {
   String relativePartitionPath = 
indexPartitionInfo.getMetadataPartitionPath();
   LOG.info(String.format("Creating a new metadata index for partition '%s' 
under path %s upto instant %s",
-  relativePartitionPath, metadataWriteConfig.getBasePath(), 
indexUptoInstantTime));
-  try {
-// file group should have already been initialized while scheduling 
index for this partition
-if (!dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {

Review Comment:
   HoodieBackedTableMetadataWriter::initializeFromFilesystem
There is a call to initializeFileGroups()
   
   Within initializeFileGroups() existing fileSlices should be deleted (I filed 
a separate PR for this - the one where the fileSlices are created in parallel).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192164647


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##
@@ -118,46 +123,32 @@ protected void initRegistry() {
   }
 
   @Override
-  protected  void initialize(HoodieEngineContext 
engineContext,
-   Option 
actionMetadata,
-   Option 
inflightInstantTimestamp) {
-try {
-  metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
-if (registry instanceof DistributedRegistry) {
-  HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext) engineContext;
-  ((DistributedRegistry) 
registry).register(sparkEngineContext.getJavaSparkContext());
-}
-  });
+  protected void commit(String instantTime, Map> partitionRecordsMap) {
+commitInternal(instantTime, partitionRecordsMap, Option.empty());
+  }
 
-  if (enabled) {
-initializeIfNeeded(dataMetaClient, actionMetadata, 
inflightInstantTimestamp);
-  }
-} catch (IOException e) {
-  LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
-  enabled = false;
-}
+  protected void bulkCommit(
+  String instantTime, MetadataPartitionType partitionType, 
HoodieData records,
+  int fileGroupCount) {
+Map> partitionRecordsMap = 
new HashMap<>();
+partitionRecordsMap.put(partitionType, records);
+SparkHoodieMetadataBulkInsertPartitioner partitioner = new 
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);

Review Comment:
   It will write HFile since the base file format of the MDT is HFile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192162585


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -824,25 +863,22 @@ private interface ConvertMetadataFunction {
   /**
* Processes commit metadata from data table and commits to metadata table.
*
-   * @param instantTime instant time of interest.
+   * @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param  type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
*/
-  private  void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-if (!dataWriteConfig.isMetadataTableEnabled()) {
-  return;
-}
+  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
+ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled());
+
 Set partitionsToUpdate = getMetadataPartitionsToUpdate();
 Set inflightIndexes = 
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
 // if indexing is inflight then do not trigger table service
 boolean doNotTriggerTableService = 
partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);

Review Comment:
   If you mean the function processAndCommit, then yes. This is used for 
non-initial commits into MDT. Only the initial commit is a bulkInsert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192161869


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -824,25 +863,22 @@ private interface ConvertMetadataFunction {
   /**
* Processes commit metadata from data table and commits to metadata table.
*
-   * @param instantTime instant time of interest.
+   * @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param  type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
*/
-  private  void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-if (!dataWriteConfig.isMetadataTableEnabled()) {
-  return;
-}
+  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
+ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled());
+
 Set partitionsToUpdate = getMetadataPartitionsToUpdate();
 Set inflightIndexes = 
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
 // if indexing is inflight then do not trigger table service
 boolean doNotTriggerTableService = 
partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);

Review Comment:
   You mean the  ValidationUtils.checkArgument? Seems redundant as this is 
already checked in constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192161017


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -619,41 +680,34 @@ private boolean 
anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti
 return false;
   }
 
-  private void 
updateInitializedPartitionsInTableConfig(List 
partitionTypes) {
-Set completedPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
-
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
-
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(),
 String.join(",", completedPartitions));
-HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
-  }
-
-  private HoodieTableMetaClient initializeMetaClient(boolean 
populateMetaFields) throws IOException {
+  private HoodieTableMetaClient initializeMetaClient() throws IOException {
 return HoodieTableMetaClient.withPropertyBuilder()
 .setTableType(HoodieTableType.MERGE_ON_READ)
 .setTableName(tableName)
 .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
 .setPayloadClassName(HoodieMetadataPayload.class.getName())
 .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
 .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
-.setPopulateMetaFields(populateMetaFields)
-
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
-.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+.setPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192160408


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -373,105 +356,92 @@ public List 
getEnabledPartitionTypes() {
 return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * 
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata   Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   * while deciding to initialize the metadata 
table.
-   */
-  protected abstract  void 
initialize(HoodieEngineContext engineContext,
-Option 
actionMetadata,
-
Option inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-try {
-  if (this.metadata != null) {
-this.metadata.close();
-  }
-  this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-  dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-  this.metadataMetaClient = metadata.getMetadataMetaClient();
-} catch (Exception e) {
-  throw new HoodieException("Error initializing metadata table for reads", 
e);
-}
-  }
-
   /**
* Initialize the metadata table if needed.
*
* @param dataMetaClient   - meta client for the data table
* @param actionMetadata   - optional action metadata
* @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
* @param   - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
*/
-  protected  void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-   Option 
actionMetadata,
-   
Option inflightInstantTimestamp) throws IOException {
+  protected  boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+  
Option actionMetadata,
+  
Option inflightInstantTimestamp) throws IOException {
 HoodieTimer timer = HoodieTimer.start();
+List partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+try {
+  boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+  if (!exists) {
+// FILES partition is always required
+partitionsToInit.add(MetadataPartitionType.FILES);
+  }
 
-if (!exists) {
-  // Initialize for the first time by listing partitions and files 
directly from the file system
-  if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+  // check if any of the enabled partition types needs to be initialized
+  // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+  if (!dataWriteConfig.isMetadataAsyncIndex()) {
+Set inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+this.enabledPartitionTypes.stream()
+.filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+.forEach(partitionsToInit::add);
   }
-  return;
-}
 
-// if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-// NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-if (!dataWriteConfig.isMetadataAsyncIndex()) {
-  Set inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-  LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-  List partitionsToInit = 
this.enabledPartitionTypes.stream()
-  .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192159380


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -562,53 +532,144 @@ private  boolean 
isCommitRevertedByInFlightAction(
   /**
* Initialize the Metadata Table by listing files and partitions from the 
file system.
*
-   * @param dataMetaClient   - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime   - Timestamp to use for the commit
+   * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
*/
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List partitionsToInit,
Option 
inflightInstantTimestamp) throws IOException {
 if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
   return false;
 }
 
-String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-initTableMetadata();
-// if async metadata indexing is enabled,
-// then only initialize files partition as other partitions will be built 
using HoodieIndexer
-List enabledPartitionTypes =  new ArrayList<>();
-if (dataWriteConfig.isMetadataAsyncIndex()) {
-  enabledPartitionTypes.add(MetadataPartitionType.FILES);
-} else {
-  // all enabled ones should be initialized
-  enabledPartitionTypes = this.enabledPartitionTypes;
+// FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+metadataMetaClient = initializeMetaClient();
+
+// Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+List partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+Map> partitionToFilesMap = 
partitionInfoList.stream()
+.map(p -> {
+  String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+  return Pair.of(partitionName, p.getFileNameToSizeMap());
+})
+.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+for (MetadataPartitionType partitionType : partitionsToInit) {
+  // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+  String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+  LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+  Pair> fileGroupCountAndRecordsPair;
+  switch (partitionType) {
+case FILES:
+  fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+  break;
+case BLOOM_FILTERS:
+  fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+  break;
+case COLUMN_STATS:
+  fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+  break;
+default:
+  throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+  }
+
+  // Generate the file groups
+  final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+  ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+  initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);

Review Comment:
   Added doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192156057


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -562,53 +532,144 @@ private  boolean 
isCommitRevertedByInFlightAction(
   /**
* Initialize the Metadata Table by listing files and partitions from the 
file system.
*
-   * @param dataMetaClient   - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime   - Timestamp to use for the commit
+   * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
*/
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List partitionsToInit,
Option 
inflightInstantTimestamp) throws IOException {
 if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
   return false;
 }
 
-String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-initTableMetadata();
-// if async metadata indexing is enabled,
-// then only initialize files partition as other partitions will be built 
using HoodieIndexer
-List enabledPartitionTypes =  new ArrayList<>();
-if (dataWriteConfig.isMetadataAsyncIndex()) {
-  enabledPartitionTypes.add(MetadataPartitionType.FILES);
-} else {
-  // all enabled ones should be initialized
-  enabledPartitionTypes = this.enabledPartitionTypes;
+// FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+metadataMetaClient = initializeMetaClient();
+
+// Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+List partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+Map> partitionToFilesMap = 
partitionInfoList.stream()
+.map(p -> {
+  String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+  return Pair.of(partitionName, p.getFileNameToSizeMap());
+})
+.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+for (MetadataPartitionType partitionType : partitionsToInit) {
+  // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+  String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+  LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+  Pair> fileGroupCountAndRecordsPair;
+  switch (partitionType) {
+case FILES:
+  fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+  break;
+case BLOOM_FILTERS:
+  fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+  break;
+case COLUMN_STATS:
+  fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+  break;
+default:
+  throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+  }
+
+  // Generate the file groups
+  final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+  ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+  initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+  // Perform the commit using bulkCommit
+  HoodieData records = 
fileGroupCountAndRecordsPair.getValue();
+  bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+  metadataMetaClient.reloadActiveTimeline();
+  dataMetaClient = 
HoodieTableMetadataUtil.setMetadataPartitionState(dataMetaClient, 
partitionType, true);
 }
-initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-initialCommit(createInstantTime, enabledPartitionTypes);
-updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
 return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-// If there is no

[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192140304


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java:
##
@@ -100,15 +99,6 @@ public Option execute() {
   // get last completed instant
   Option indexUptoInstant = 
table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant();
   if (indexUptoInstant.isPresent()) {
-// start initializing file groups
-// in case FILES partition itself was not initialized before (i.e. 
metadata was never enabled), this will initialize synchronously
-HoodieTableMetadataWriter metadataWriter = 
table.getMetadataWriter(instantTime)
-.orElseThrow(() -> new HoodieIndexException(String.format("Could 
not get metadata writer to initialize filegroups for indexing for instant: %s", 
instantTime)));
-if 
(!finalPartitionsToIndex.get(0).getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath()))
 {
-  // initialize metadata partition only if not for FILES partition.
-  metadataWriter.initializeMetadataPartitions(table.getMetaClient(), 
finalPartitionsToIndex, indexUptoInstant.get().getTimestamp());

Review Comment:
   Initialization of file groups happens just before the bulkInsert in 
HoodieBackedTableMetadataWriter::initializeFromFilesystem



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192139220


##
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##
@@ -33,10 +33,6 @@ public enum MetadataPartitionType {
   private final String partitionPath;
   // FileId prefix used for all file groups in this partition.
   private final String fileIdPrefix;
-  // Total file groups
-  // TODO fix: enum should not have any mutable aspect as this compromises 
whole idea
-  //  of the enum being static, immutable entity
-  private int fileGroupCount = 1;

Review Comment:
   Didnt see any code serializing and saving this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192138716


##
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##
@@ -670,17 +670,75 @@ private Long getTableChecksum() {
 return getLong(TABLE_CHECKSUM);
   }
 
-  public List getMetadataPartitionsInflight() {
-return StringUtils.split(
-getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
-CONFIG_VALUES_DELIMITER
-);
+  public Set getMetadataPartitionsInflight() {
+return new HashSet<>(StringUtils.split(
+getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
+CONFIG_VALUES_DELIMITER));
   }
 
   public Set getMetadataPartitions() {
 return new HashSet<>(
-StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
-CONFIG_VALUES_DELIMITER));
+StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
+CONFIG_VALUES_DELIMITER));
+  }
+
+  /**
+   * @returns true if metadata table has been created and is being used for 
this dataset, else returns false.
+   */
+  public boolean isMetadataTableEnabled() {
+return isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+  }
+
+  /**
+   * Checks if metadata table is enabled and the specified partition has been 
initialized.
+   *
+   * @param partition The partition to check
+   * @returns true if the specific partition has been initialized, else 
returns false.
+   */
+  public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) {
+return getMetadataPartitions().contains(partition.getPartitionPath());
+  }
+
+  /**
+   * Enables or disables the specified metadata table partition.
+   *
+   * @param partition The partition
+   * @param enabled   If true, the partition is enabled, else disabled
+   */
+  public void setMetadataPartitionState(MetadataPartitionType partition, 
boolean enabled) {
+
ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER),
+"Metadata Table partition path cannot contain a comma: " + 
partition.getPartitionPath());
+Set partitions = getMetadataPartitions();
+Set partitionsInflight = getMetadataPartitionsInflight();
+if (enabled) {
+  partitions.add(partition.getPartitionPath());
+  partitionsInflight.remove(partition.getPartitionPath());
+} else if (partition.equals(MetadataPartitionType.FILES)) {
+  // file listing partition is required for all other partitions to work
+  // Disabling file partition will also disable all partitions
+  partitions.clear();
+  partitionsInflight.clear();
+} else {
+  partitions.remove(partition.getPartitionPath());
+  partitionsInflight.remove(partition.getPartitionPath());
+}
+setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+  }
+
+  /**
+   * Enables the specified metadata table partition as inflight.
+   *
+   * @param partition The list of partitions to enable as inflight.
+   */
+  public void setMetadataPartitionsAsInflight(List 
partitionTypes) {
+Set partitions = getMetadataPartitionsInflight();

Review Comment:
   Renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prashantwason commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192137497


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to be already tagged with location.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner> {
+  final int numPartitions;
+  public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) {
+this.numPartitions = numPartitions;
+  }
+
+  private class FileGroupPartitioner extends Partitioner {
+
+@Override
+public int getPartition(Object key) {
+  return ((Tuple2)key)._1;
+}
+
+@Override
+public int numPartitions() {
+  return numPartitions;
+}
+  }
+
+  // FileIDs for the various partitions
+  private List fileIDPfxs;
+
+  /**
+   * Partition the records by their location. The number of partitions is 
determined by the number of MDT fileGroups being udpated rather than the
+   * specific value of outputSparkPartitions.
+   */
+  @Override
+  public JavaRDD repartitionRecords(JavaRDD 
records, int outputSparkPartitions) {
+Comparator> keyComparator =
+(Comparator> & Serializable)(t1, t2) -> 
t1._2.compareTo(t2._2);
+
+// Partition the records by their file group
+JavaRDD partitionedRDD = records
+// key by . The file group index is 
used to partition and the record key is used to sort within the partition.
+.keyBy(r -> {
+  int fileGroupIndex = 
HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId());
+  return new Tuple2<>(fileGroupIndex, r.getRecordKey());
+})
+.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), 
keyComparator)
+.map(t -> t._2);
+
+fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+  // Due to partitioning, all record in the partition should have same 
fileID. So we only can get the fileID prefix from the first record.
+  List fileIds = new ArrayList<>(1);
+  if (recordItr.hasNext()) {
+HoodieRecord record = recordItr.next();
+final String fileID = 
HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId());
+fileIds.add(fileID);
+  } else {
+// Empty partition
+fileIds.add("");

Review Comment:
   Added doc
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org