[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-13 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1227895765


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to be already tagged with location.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner> {
+  final int numPartitions;
+  public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) {
+this.numPartitions = numPartitions;
+  }
+
+  private class FileGroupPartitioner extends Partitioner {
+
+@Override
+public int getPartition(Object key) {
+  return ((Tuple2)key)._1;
+}
+
+@Override
+public int numPartitions() {
+  return numPartitions;
+}
+  }
+
+  // FileIDs for the various partitions
+  private List fileIDPfxs;
+
+  /**
+   * Partition the records by their location. The number of partitions is 
determined by the number of MDT fileGroups being udpated rather than the
+   * specific value of outputSparkPartitions.
+   */
+  @Override
+  public JavaRDD repartitionRecords(JavaRDD 
records, int outputSparkPartitions) {
+Comparator> keyComparator =
+(Comparator> & Serializable)(t1, t2) -> 
t1._2.compareTo(t2._2);
+
+// Partition the records by their file group
+JavaRDD partitionedRDD = records
+// key by . The file group index is 
used to partition and the record key is used to sort within the partition.
+.keyBy(r -> {
+  int fileGroupIndex = 
HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId());
+  return new Tuple2<>(fileGroupIndex, r.getRecordKey());
+})
+.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), 
keyComparator)
+.map(t -> t._2);
+
+fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+  // Due to partitioning, all record in the partition should have same 
fileID. So we only can get the fileID prefix from the first record.
+  List fileIds = new ArrayList<>(1);
+  if (recordItr.hasNext()) {
+HoodieRecord record = recordItr.next();
+final String fileID = 
HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId());
+fileIds.add(fileID);
+  } else {
+// FileGroupPartitioner returns a fixed number of partition as part of 
numPartitions(). In the special case that recordsRDD has fewer
+// records than fileGroupCount, some of these partitions 
(corresponding to fileGroups) will not have any data.
+// But we still need to return a fileID for use within {@code 
BulkInsertMapFunction}
+fileIds.add("");
+  }
+  return fileIds.iterator();
+}, true).collect();
+ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == 
fileIDPfxs.size(),
+String.format("Generated fileIDPfxs (%d) are lesser in size than 
the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions()));
+
+return partitionedRDD;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+return true;
+  }
+
+  @Override
+  public String getFileIdPfx(int partitionId) {
+return fileIDPfxs.get(partitionId);

Review Comment:
   > automatic estimation of the shard counts for each partition, that can be 
enhanced
   
   This may be a solution if we can make accurate estimation of the file group 
size.



-- 
This is an 

[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-10 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1225702442


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to be already tagged with location.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner> {
+  final int numPartitions;
+  public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) {
+this.numPartitions = numPartitions;
+  }
+
+  private class FileGroupPartitioner extends Partitioner {
+
+@Override
+public int getPartition(Object key) {
+  return ((Tuple2)key)._1;
+}
+
+@Override
+public int numPartitions() {
+  return numPartitions;
+}
+  }
+
+  // FileIDs for the various partitions
+  private List fileIDPfxs;
+
+  /**
+   * Partition the records by their location. The number of partitions is 
determined by the number of MDT fileGroups being udpated rather than the
+   * specific value of outputSparkPartitions.
+   */
+  @Override
+  public JavaRDD repartitionRecords(JavaRDD 
records, int outputSparkPartitions) {
+Comparator> keyComparator =
+(Comparator> & Serializable)(t1, t2) -> 
t1._2.compareTo(t2._2);
+
+// Partition the records by their file group
+JavaRDD partitionedRDD = records
+// key by . The file group index is 
used to partition and the record key is used to sort within the partition.
+.keyBy(r -> {
+  int fileGroupIndex = 
HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId());
+  return new Tuple2<>(fileGroupIndex, r.getRecordKey());
+})
+.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), 
keyComparator)
+.map(t -> t._2);
+
+fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+  // Due to partitioning, all record in the partition should have same 
fileID. So we only can get the fileID prefix from the first record.
+  List fileIds = new ArrayList<>(1);
+  if (recordItr.hasNext()) {
+HoodieRecord record = recordItr.next();
+final String fileID = 
HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId());
+fileIds.add(fileID);
+  } else {
+// FileGroupPartitioner returns a fixed number of partition as part of 
numPartitions(). In the special case that recordsRDD has fewer
+// records than fileGroupCount, some of these partitions 
(corresponding to fileGroups) will not have any data.
+// But we still need to return a fileID for use within {@code 
BulkInsertMapFunction}
+fileIds.add("");
+  }
+  return fileIds.iterator();
+}, true).collect();
+ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == 
fileIDPfxs.size(),
+String.format("Generated fileIDPfxs (%d) are lesser in size than 
the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions()));
+
+return partitionedRDD;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+return true;
+  }
+
+  @Override
+  public String getFileIdPfx(int partitionId) {
+return fileIDPfxs.get(partitionId);

Review Comment:
No one can have awareness the file group number is pertinent with the 
correctness. It is a bug, not a usability issue.



-- 
This is an automated message from the Apache Git Service.
To respond 

[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-10 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1225695785


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to be already tagged with location.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner> {
+  final int numPartitions;
+  public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) {
+this.numPartitions = numPartitions;
+  }
+
+  private class FileGroupPartitioner extends Partitioner {
+
+@Override
+public int getPartition(Object key) {
+  return ((Tuple2)key)._1;
+}
+
+@Override
+public int numPartitions() {
+  return numPartitions;
+}
+  }
+
+  // FileIDs for the various partitions
+  private List fileIDPfxs;
+
+  /**
+   * Partition the records by their location. The number of partitions is 
determined by the number of MDT fileGroups being udpated rather than the
+   * specific value of outputSparkPartitions.
+   */
+  @Override
+  public JavaRDD repartitionRecords(JavaRDD 
records, int outputSparkPartitions) {
+Comparator> keyComparator =
+(Comparator> & Serializable)(t1, t2) -> 
t1._2.compareTo(t2._2);
+
+// Partition the records by their file group
+JavaRDD partitionedRDD = records
+// key by . The file group index is 
used to partition and the record key is used to sort within the partition.
+.keyBy(r -> {
+  int fileGroupIndex = 
HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId());
+  return new Tuple2<>(fileGroupIndex, r.getRecordKey());
+})
+.repartitionAndSortWithinPartitions(new FileGroupPartitioner(), 
keyComparator)
+.map(t -> t._2);
+
+fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+  // Due to partitioning, all record in the partition should have same 
fileID. So we only can get the fileID prefix from the first record.
+  List fileIds = new ArrayList<>(1);
+  if (recordItr.hasNext()) {
+HoodieRecord record = recordItr.next();
+final String fileID = 
HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId());
+fileIds.add(fileID);
+  } else {
+// FileGroupPartitioner returns a fixed number of partition as part of 
numPartitions(). In the special case that recordsRDD has fewer
+// records than fileGroupCount, some of these partitions 
(corresponding to fileGroups) will not have any data.
+// But we still need to return a fileID for use within {@code 
BulkInsertMapFunction}
+fileIds.add("");
+  }
+  return fileIds.iterator();
+}, true).collect();
+ValidationUtils.checkArgument(partitionedRDD.getNumPartitions() == 
fileIDPfxs.size(),
+String.format("Generated fileIDPfxs (%d) are lesser in size than 
the partitions %d", fileIDPfxs.size(), partitionedRDD.getNumPartitions()));
+
+return partitionedRDD;
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+return true;
+  }
+
+  @Override
+  public String getFileIdPfx(int partitionId) {
+return fileIDPfxs.get(partitionId);

Review Comment:
   This may not be right when one Hfile in one file group is too large to write 
with, there is a upper threshold for each base file handle, 
`HoodieAvroHFileWrite#canWrite` may return false, then the 

[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-08 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1223719557


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -1052,51 +1091,81 @@ protected HoodieData 
prepRecords(Map
+   * Don't perform optimization if there are inflight operations on the 
dataset. This is for two reasons:
+   * - The compaction will contain the correct data as all failed operations 
have been rolled back.
+   * - Clean/compaction etc. will have the highest timestamp on the MDT and we 
won't be adding new operations
+   * with smaller timestamps to metadata table (makes for easier debugging)
+   * 
+   * This adds the limitations that long-running async operations (clustering, 
etc.) may cause delay in such MDT
+   * optimizations. We will relax this after MDT code has been hardened.
*/
-  protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String 
instantTime) {
-// finish off any pending compactions if any from previous attempt.
-writeClient.runAnyPendingCompactions();
-
-String latestDeltaCommitTimeInMetadataTable = 
metadataMetaClient.reloadActiveTimeline()
-.getDeltaCommitTimeline()
-.filterCompletedInstants()
-.lastInstant().orElseThrow(() -> new HoodieMetadataException("No 
completed deltacommit in metadata table"))
-.getTimestamp();
-// we need to find if there are any inflights in data table timeline 
before or equal to the latest delta commit in metadata table.
-// Whenever you want to change this logic, please ensure all below 
scenarios are considered.
-// a. There could be a chance that latest delta commit in MDT is committed 
in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
-// b. There could be DT inflights after latest delta commit in MDT and we 
are ok with it. bcoz, the contract is, latest compaction instant time in MDT 
represents
-// any instants before that is already synced with metadata table.
-// c. Do consider out of order commits. For eg, c4 from DT could complete 
before c3. and we can't trigger compaction in MDT with c4 as base instant time, 
until every
-// instant before c4 is synced with metadata table.
-List pendingInstants = 
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
-
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+  @Override
+  public void performTableServices(Option inFlightInstantTimestamp) {
+HoodieTimer metadataTableServicesTimer = HoodieTimer.start();
+boolean allTableServicesExecutedSuccessfullyOrSkipped = true;
+try {
+  BaseHoodieWriteClient writeClient = getWriteClient();
+  // Run any pending table services operations.
+  runPendingTableServicesOperations(writeClient);
+
+  // Check and run clean operations.
+  String latestDeltacommitTime = 
metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline()
+  .filterCompletedInstants()
+  .lastInstant().get()
+  .getTimestamp();
+  LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + 
", running clean operations.");
+  cleanIfNecessary(writeClient, latestDeltacommitTime);
+
+  // Do timeline validation before scheduling compaction/logcompaction 
operations.
+  if 
(!validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, 
latestDeltacommitTime)) {
+return;

Review Comment:
   > unless compaction in MDT kicks in, archival might not have anything to do 
after last time it was able to archive something.
   
   Then archiving will always be blocked by the compaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-08 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1222772904


##
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java:
##
@@ -104,40 +106,19 @@ protected void initRegistry() {
   }
 
   @Override
-  protected  void initialize(HoodieEngineContext 
engineContext,
-   Option 
actionMetadata,
-   Option 
inflightInstantTimestamp) {
-try {
-  if (enabled) {
-initializeIfNeeded(dataMetaClient, actionMetadata, 
inflightInstantTimestamp);
-  }
-} catch (IOException e) {
-  LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
-  enabled = false;
-}
-  }
-
-  @Override
-  protected void commit(String instantTime, Map> partitionRecordsMap,
-boolean canTriggerTableService) {
-ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
+  protected void commit(String instantTime, Map> partitionRecordsMap) {
 ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is 
not fully initialized yet.");
 HoodieData preppedRecords = prepRecords(partitionRecordsMap);
 List preppedRecordList = preppedRecords.collectAsList();
 
-try (HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
+//  Flink engine does not optimize initialCommit to MDT as bulk insert is 
not yet supported
+
+try (HoodieFlinkWriteClient writeClient = (HoodieFlinkWriteClient) 
getWriteClient()) {
   // rollback partially failed writes if any.
   if (writeClient.rollbackFailedWrites()) {
 metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
   }
 
-  if (canTriggerTableService) {
-// trigger compaction before doing the delta commit. this is to 
ensure, if this delta commit succeeds in metadata table, but failed in data 
table,
-// we would have compacted metadata table and so could have included 
uncommitted data which will never be ignored while reading from metadata
-// table (since reader will filter out only from delta commits)
-compactIfNecessary(writeClient, instantTime);
-  }

Review Comment:
   You are right, we should trigger the metadata table compaction on each 
commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-08 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1222548105


##
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##
@@ -1453,7 +1453,11 @@ public static String 
deleteMetadataTablePartition(HoodieTableMetaClient dataMeta
* @return The fileID
*/
   public static String getFileIDForFileGroup(MetadataPartitionType 
partitionType, int index) {
-return String.format("%s%04d", partitionType.getFileIdPrefix(), index);
+if (partitionType == MetadataPartitionType.FILES) {
+  return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), 
index, 0);
+} else {

Review Comment:
   Should we fix all the partitions or just the `FILES` partition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-08 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1222533305


##
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##
@@ -1453,7 +1453,11 @@ public static String 
deleteMetadataTablePartition(HoodieTableMetaClient dataMeta
* @return The fileID
*/
   public static String getFileIDForFileGroup(MetadataPartitionType 
partitionType, int index) {
-return String.format("%s%04d", partitionType.getFileIdPrefix(), index);
+if (partitionType == MetadataPartitionType.FILES) {
+  return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), 
index, 0);
+} else {

Review Comment:
   Discuss offline, this is a fix for consistency of base file fileGroup id and 
the log files, because the bulk_insert create handle would append a suffix 
(starts from `-0`) for each file it creates.



##
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##
@@ -1453,7 +1453,11 @@ public static String 
deleteMetadataTablePartition(HoodieTableMetaClient dataMeta
* @return The fileID
*/
   public static String getFileIDForFileGroup(MetadataPartitionType 
partitionType, int index) {
-return String.format("%s%04d", partitionType.getFileIdPrefix(), index);
+if (partitionType == MetadataPartitionType.FILES) {
+  return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), 
index, 0);
+} else {

Review Comment:
   Discussed offline, this is a fix for consistency of base file fileGroup id 
and the log files, because the bulk_insert create handle would append a suffix 
(starts from `-0`) for each file it creates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-06-06 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1220774838


##
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##
@@ -1453,7 +1453,11 @@ public static String 
deleteMetadataTablePartition(HoodieTableMetaClient dataMeta
* @return The fileID
*/
   public static String getFileIDForFileGroup(MetadataPartitionType 
partitionType, int index) {
-return String.format("%s%04d", partitionType.getFileIdPrefix(), index);
+if (partitionType == MetadataPartitionType.FILES) {
+  return String.format("%s%04d-%d", partitionType.getFileIdPrefix(), 
index, 0);
+} else {

Review Comment:
   What is the purpos of extra suffix `-0` for `FILES` file group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-24 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203914496


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -338,14 +338,20 @@ protected void initMetadataTable(Option 
instantTime) {
* @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
*/
   private void initializeMetadataTable(Option 
inFlightInstantTimestamp) {
-if (config.isMetadataTableEnabled()) {
-  HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-  context, Option.empty(), inFlightInstantTimestamp);
-  try {
-writer.close();
-  } catch (Exception e) {
-throw new HoodieException("Failed to instantiate Metadata table ", e);
+if (!config.isMetadataTableEnabled()) {
+  LOG.error("11");
+  return;
+}
+
+try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
+context, Option.empty(), inFlightInstantTimestamp)) {
+  if (writer.isInitialized()) {
+writer.performTableServices(inFlightInstantTimestamp);
+  } else {
+throw new HoodieException((".22"));

Review Comment:
   Ditto: a meaningless exception msg.



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -626,8 +657,7 @@ private void initializeFileGroups(HoodieTableMetaClient 
dataMetaClient, Metadata
 
 HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
 
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), 
metadataPartition.getPartitionPath()))
-.withFileId(fileGroupFileId)
-.overBaseCommit(instantTime)
+.withFileId(fileGroupFileId).overBaseCommit(instantTime)
 .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)

Review Comment:
   Unnecessary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-24 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203913275


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -338,14 +338,20 @@ protected void initMetadataTable(Option 
instantTime) {
* @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
*/
   private void initializeMetadataTable(Option 
inFlightInstantTimestamp) {
-if (config.isMetadataTableEnabled()) {
-  HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-  context, Option.empty(), inFlightInstantTimestamp);
-  try {
-writer.close();
-  } catch (Exception e) {
-throw new HoodieException("Failed to instantiate Metadata table ", e);
+if (!config.isMetadataTableEnabled()) {
+  LOG.error("11");

Review Comment:
   Why a meaningless log?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-24 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203785303


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -564,53 +532,147 @@ private  boolean 
isCommitRevertedByInFlightAction(
   /**
* Initialize the Metadata Table by listing files and partitions from the 
file system.
*
-   * @param dataMetaClient   - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime   - Timestamp to use for the commit
+   * @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
*/
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List partitionsToInit,
Option 
inflightInstantTimestamp) throws IOException {
 if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
   return false;
 }
 
-String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-initTableMetadata();
-// if async metadata indexing is enabled,
-// then only initialize files partition as other partitions will be built 
using HoodieIndexer
-List enabledPartitionTypes =  new ArrayList<>();
-if (dataWriteConfig.isMetadataAsyncIndex()) {
-  enabledPartitionTypes.add(MetadataPartitionType.FILES);
-} else {
-  // all enabled ones should be initialized
-  enabledPartitionTypes = this.enabledPartitionTypes;
+// FILES partition is always initialized first
+
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+|| partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+metadataMetaClient = initializeMetaClient();
+
+// Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+List partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+Map> partitionToFilesMap = 
partitionInfoList.stream()

Review Comment:
   Thanks for the clarification, makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-24 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194736640


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -337,15 +337,33 @@ protected void initMetadataTable(Option 
instantTime) {
*
* @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
*/
-  private void initializeMetadataTable(Option 
inFlightInstantTimestamp) {
-if (config.isMetadataTableEnabled()) {
-  HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-  context, Option.empty(), inFlightInstantTimestamp);
-  try {
-writer.close();
-  } catch (Exception e) {
-throw new HoodieException("Failed to instantiate Metadata table ", e);
+  private void initializeMetadataTable(WriteOperationType operationType, 
Option inFlightInstantTimestamp) {
+if (!config.isMetadataTableEnabled()) {
+  return;
+}
+
+try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
+context, Option.empty(), inFlightInstantTimestamp)) {
+  if (writer.isInitialized()) {
+// Optimize the metadata table which involves compacton. cleaning, 
etc. This should only be called from writers.
+switch (operationType) {
+  case INSERT:
+  case INSERT_PREPPED:
+  case UPSERT:
+  case UPSERT_PREPPED:
+  case BULK_INSERT:
+  case BULK_INSERT_PREPPED:
+  case DELETE:

Review Comment:
   Enuming the write operation is really hard to maintain, can we trigger the 
table service whatever the operation is ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-16 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1194731645


##
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##
@@ -694,17 +695,75 @@ private Long getTableChecksum() {
 return getLong(TABLE_CHECKSUM);
   }
 
-  public List getMetadataPartitionsInflight() {
-return StringUtils.split(
-getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
-CONFIG_VALUES_DELIMITER
-);
+  public Set getMetadataPartitionsInflight() {
+return new HashSet<>(StringUtils.split(
+getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
+CONFIG_VALUES_DELIMITER));
   }
 
   public Set getMetadataPartitions() {
 return new HashSet<>(
-StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
-CONFIG_VALUES_DELIMITER));
+StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
+CONFIG_VALUES_DELIMITER));
+  }
+
+  /**
+   * @returns true if metadata table has been created and is being used for 
this dataset, else returns false.
+   */
+  public boolean isMetadataTableEnabled() {
+return isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+  }
+
+  /**
+   * Checks if metadata table is enabled and the specified partition has been 
initialized.
+   *
+   * @param partition The partition to check
+   * @returns true if the specific partition has been initialized, else 
returns false.
+   */
+  public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) {
+return getMetadataPartitions().contains(partition.getPartitionPath());
+  }
+
+  /**
+   * Enables or disables the specified metadata table partition.
+   *
+   * @param partition The partition
+   * @param enabled   If true, the partition is enabled, else disabled
+   */
+  public void setMetadataPartitionState(MetadataPartitionType partition, 
boolean enabled) {
+
ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER),
+"Metadata Table partition path cannot contain a comma: " + 
partition.getPartitionPath());
+Set partitions = getMetadataPartitions();
+Set partitionsInflight = getMetadataPartitionsInflight();
+if (enabled) {
+  partitions.add(partition.getPartitionPath());
+  partitionsInflight.remove(partition.getPartitionPath());
+} else if (partition.equals(MetadataPartitionType.FILES)) {
+  // file listing partition is required for all other partitions to work
+  // Disabling file partition will also disable all partitions
+  partitions.clear();
+  partitionsInflight.clear();
+} else {
+  partitions.remove(partition.getPartitionPath());
+  partitionsInflight.remove(partition.getPartitionPath());
+}
+setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));

Review Comment:
   Do we need to persist these options?



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##
@@ -337,15 +337,33 @@ protected void initMetadataTable(Option 
instantTime) {
*
* @param inFlightInstantTimestamp - The in-flight action responsible for 
the metadata table initialization
*/
-  private void initializeMetadataTable(Option 
inFlightInstantTimestamp) {
-if (config.isMetadataTableEnabled()) {
-  HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
-  context, Option.empty(), inFlightInstantTimestamp);
-  try {
-writer.close();
-  } catch (Exception e) {
-throw new HoodieException("Failed to instantiate Metadata table ", e);
+  private void initializeMetadataTable(WriteOperationType operationType, 
Option inFlightInstantTimestamp) {
+if (!config.isMetadataTableEnabled()) {
+  return;
+}
+
+try (HoodieTableMetadataWriter writer = 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config,
+context, Option.empty(), inFlightInstantTimestamp)) {
+  if (writer.isInitialized()) {
+// Optimize the metadata table which involves compacton. cleaning, 
etc. This should only be called from writers.
+switch (operationType) {
+  case INSERT:
+  case INSERT_PREPPED:
+  case UPSERT:
+  case UPSERT_PREPPED:
+  case BULK_INSERT:
+  case BULK_INSERT_PREPPED:
+  case DELETE:

Review Comment:
   Enum the write operation is really hard to maintain, can we triggers the 
table sercive whatever the operation is ?



##

[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192914114


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -824,25 +863,22 @@ private interface ConvertMetadataFunction {
   /**
* Processes commit metadata from data table and commits to metadata table.
*
-   * @param instantTime instant time of interest.
+   * @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param  type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
*/
-  private  void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-if (!dataWriteConfig.isMetadataTableEnabled()) {
-  return;
-}
+  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
+ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled());
+
 Set partitionsToUpdate = getMetadataPartitionsToUpdate();
 Set inflightIndexes = 
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
 // if indexing is inflight then do not trigger table service
 boolean doNotTriggerTableService = 
partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);

Review Comment:
   I mean `doNotTriggerTableService`, it seems not be used in anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192914114


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -824,25 +863,22 @@ private interface ConvertMetadataFunction {
   /**
* Processes commit metadata from data table and commits to metadata table.
*
-   * @param instantTime instant time of interest.
+   * @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param  type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
*/
-  private  void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-if (!dataWriteConfig.isMetadataTableEnabled()) {
-  return;
-}
+  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
+ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled());
+
 Set partitionsToUpdate = getMetadataPartitionsToUpdate();
 Set inflightIndexes = 
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
 // if indexing is inflight then do not trigger table service
 boolean doNotTriggerTableService = 
partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);

Review Comment:
   I mean `doNotTriggerTableService`, it seems not used in anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-12 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1192913868


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##
@@ -118,46 +123,32 @@ protected void initRegistry() {
   }
 
   @Override
-  protected  void initialize(HoodieEngineContext 
engineContext,
-   Option 
actionMetadata,
-   Option 
inflightInstantTimestamp) {
-try {
-  metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
-if (registry instanceof DistributedRegistry) {
-  HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext) engineContext;
-  ((DistributedRegistry) 
registry).register(sparkEngineContext.getJavaSparkContext());
-}
-  });
+  protected void commit(String instantTime, Map> partitionRecordsMap) {
+commitInternal(instantTime, partitionRecordsMap, Option.empty());
+  }
 
-  if (enabled) {
-initializeIfNeeded(dataMetaClient, actionMetadata, 
inflightInstantTimestamp);
-  }
-} catch (IOException e) {
-  LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
-  enabled = false;
-}
+  protected void bulkCommit(
+  String instantTime, MetadataPartitionType partitionType, 
HoodieData records,
+  int fileGroupCount) {
+Map> partitionRecordsMap = 
new HashMap<>();
+partitionRecordsMap.put(partitionType, records);
+SparkHoodieMetadataBulkInsertPartitioner partitioner = new 
SparkHoodieMetadataBulkInsertPartitioner(fileGroupCount);

Review Comment:
   You are right, thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #8684: [HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

2023-05-11 Thread via GitHub


danny0405 commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1190914223


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -619,41 +680,34 @@ private boolean 
anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Opti
 return false;
   }
 
-  private void 
updateInitializedPartitionsInTableConfig(List 
partitionTypes) {
-Set completedPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
-
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
-
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(),
 String.join(",", completedPartitions));
-HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
-  }
-
-  private HoodieTableMetaClient initializeMetaClient(boolean 
populateMetaFields) throws IOException {
+  private HoodieTableMetaClient initializeMetaClient() throws IOException {
 return HoodieTableMetaClient.withPropertyBuilder()
 .setTableType(HoodieTableType.MERGE_ON_READ)
 .setTableName(tableName)
 .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
 .setPayloadClassName(HoodieMetadataPayload.class.getName())
 .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
 .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
-.setPopulateMetaFields(populateMetaFields)
-
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
-.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+.setPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
+
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());

Review Comment:
   Fix the indentation.



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##
@@ -824,25 +863,22 @@ private interface ConvertMetadataFunction {
   /**
* Processes commit metadata from data table and commits to metadata table.
*
-   * @param instantTime instant time of interest.
+   * @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the 
respective metadata to List of HoodieRecords to be written to metadata table.
-   * @param  type of commit metadata.
-   * @param canTriggerTableService true if table services can be triggered. 
false otherwise.
*/
-  private  void processAndCommit(String instantTime, 
ConvertMetadataFunction convertMetadataFunction, boolean 
canTriggerTableService) {
-if (!dataWriteConfig.isMetadataTableEnabled()) {
-  return;
-}
+  private void processAndCommit(String instantTime, ConvertMetadataFunction 
convertMetadataFunction) {
+ValidationUtils.checkArgument(dataWriteConfig.isMetadataTableEnabled());
+
 Set partitionsToUpdate = getMetadataPartitionsToUpdate();
 Set inflightIndexes = 
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
 // if indexing is inflight then do not trigger table service
 boolean doNotTriggerTableService = 
partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);

Review Comment:
   Do we still need this?



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##
@@ -118,46 +123,32 @@ protected void initRegistry() {
   }
 
   @Override
-  protected  void initialize(HoodieEngineContext 
engineContext,
-   Option 
actionMetadata,
-   Option 
inflightInstantTimestamp) {
-try {
-  metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
-if (registry instanceof DistributedRegistry) {
-  HoodieSparkEngineContext sparkEngineContext = 
(HoodieSparkEngineContext) engineContext;
-  ((DistributedRegistry) 
registry).register(sparkEngineContext.getJavaSparkContext());
-}
-  });
+  protected void commit(String instantTime, Map> partitionRecordsMap) {
+commitInternal(instantTime, partitionRecordsMap, Option.empty());
+  }
 
-  if (enabled) {
-initializeIfNeeded(dataMetaClient, actionMetadata, 
inflightInstantTimestamp);
-  }
-} catch (IOException e) {
-  LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
-  enabled = false;
-}
+  protected void bulkCommit(
+  String instantTime, MetadataPartitionType partitionType, 
HoodieData records,
+  int fileGroupCount) {
+Map> partitionRecordsMap = 
new HashMap<>();
+partitionRecordsMap.put(partitionType,