vinothchandar commented on code in PR #9517:
URL: https://github.com/apache/hudi/pull/9517#discussion_r1323753544


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/HoodieCommitMetadataBasedIndexingCatchup.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.RESTORE_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
+
+/**
+ * Indexing catchup task for commit metadata based indexing.
+ */
+public class HoodieCommitMetadataBasedIndexingCatchup extends 
BaseIndexingCatchupTask {

Review Comment:
   simpler name?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/BaseIndexingCatchupTask.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hudi.table.action.index.RunIndexActionExecutor.TIMELINE_RELOAD_INTERVAL_MILLIS;
+
+/**
+ * Indexing check runs for instants that completed after the base instant (in 
the index plan).
+ * It will check if these later instants have logged updates to metadata table 
or not.
+ * If not, then it will do the update. If a later instant is inflight, it will 
wait until it is completed or the task times out.
+ */
+public abstract class BaseIndexingCatchupTask implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseIndexingCatchupTask.class);
+
+  protected final HoodieTableMetadataWriter metadataWriter;
+  protected final List<HoodieInstant> instantsToIndex;
+  protected final Set<String> metadataCompletedInstants;
+  protected final HoodieTableMetaClient metaClient;
+  protected final HoodieTableMetaClient metadataMetaClient;
+  protected final TransactionManager transactionManager;
+  protected final HoodieEngineContext engineContext;
+  protected String currentCaughtupInstant;
+
+  public BaseIndexingCatchupTask(HoodieTableMetadataWriter metadataWriter,
+                                 List<HoodieInstant> instantsToIndex,
+                                 Set<String> metadataCompletedInstants,
+                                 HoodieTableMetaClient metaClient,
+                                 HoodieTableMetaClient metadataMetaClient,
+                                 TransactionManager transactionManager,
+                                 String currentCaughtupInstant,
+                                 HoodieEngineContext engineContext) {
+    this.metadataWriter = metadataWriter;
+    this.instantsToIndex = instantsToIndex;
+    this.metadataCompletedInstants = metadataCompletedInstants;
+    this.metaClient = metaClient;
+    this.metadataMetaClient = metadataMetaClient;
+    this.transactionManager = transactionManager;
+    this.currentCaughtupInstant = currentCaughtupInstant;
+    this.engineContext = engineContext;
+  }
+
+  @Override
+  public abstract void run();
+
+  HoodieInstant getHoodieInstant(HoodieInstant instant) {
+    if (!metadataCompletedInstants.isEmpty() && 
metadataCompletedInstants.contains(instant.getTimestamp())) {
+      currentCaughtupInstant = instant.getTimestamp();
+      return null;
+    }
+    while (!instant.isCompleted()) {
+      try {
+        LOG.warn("instant not completed, reloading timeline " + instant);
+        // reload timeline and fetch instant details again wait until timeout
+        String instantTime = instant.getTimestamp();
+        Option<HoodieInstant> currentInstant = 
metaClient.reloadActiveTimeline()

Review Comment:
   can we please push the wait inside reloadActiveTimeline(). 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/HoodieRecordBasedIndexingCatchup.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.RESTORE_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
+
+/**
+ * Indexing catchup task for record level indexing.
+ */
+public class HoodieRecordBasedIndexingCatchup extends BaseIndexingCatchupTask {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordBasedIndexingCatchup.class);
+
+  public HoodieRecordBasedIndexingCatchup(HoodieTableMetadataWriter 
metadataWriter,
+                                          List<HoodieInstant> instantsToIndex,
+                                          Set<String> 
metadataCompletedInstants,
+                                          HoodieTableMetaClient metaClient,
+                                          HoodieTableMetaClient 
metadataMetaClient,
+                                          String currentCaughtupInstant,
+                                          TransactionManager 
transactionManager,
+                                          HoodieEngineContext engineContext) {
+    super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant, 
engineContext);
+  }
+
+  @Override
+  public void run() {
+    for (HoodieInstant instant : instantsToIndex) {
+      // metadata index already updated for this instant
+      instant = getHoodieInstant(instant);
+      if (instant == null) {
+        continue;
+      }
+      // if instant completed, ensure that there was metadata commit, else 
update metadata for this completed instant
+      if (COMPLETED.equals(instant.getState())) {
+        String instantTime = instant.getTimestamp();
+        Option<HoodieInstant> metadataInstant = 
metadataMetaClient.reloadActiveTimeline()
+            .filterCompletedInstants().filter(i -> 
i.getTimestamp().equals(instantTime)).firstInstant();
+        if (metadataInstant.isPresent()) {
+          currentCaughtupInstant = instantTime;
+          continue;
+        }
+        try {
+          // we need take a lock here as inflight writer could also try to 
update the timeline
+          transactionManager.beginTransaction(Option.of(instant), 
Option.empty());
+          LOG.info("Updating metadata table for instant: " + instant);
+          switch (instant.getAction()) {
+            case HoodieTimeline.COMMIT_ACTION:
+            case HoodieTimeline.DELTA_COMMIT_ACTION:
+            case HoodieTimeline.REPLACE_COMMIT_ACTION:
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
metaClient.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+              HoodieData<HoodieRecord> records = 
readRecordsFromDataFiles(instant);
+              metadataWriter.update(commitMetadata, records, 
instant.getTimestamp());
+              break;
+            case CLEAN_ACTION:
+              HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(metaClient, instant);
+              metadataWriter.update(cleanMetadata, instant.getTimestamp());
+              break;
+            case RESTORE_ACTION:
+              HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+                  
metaClient.getActiveTimeline().getInstantDetails(instant).get());
+              metadataWriter.update(restoreMetadata, instant.getTimestamp());
+              break;
+            case ROLLBACK_ACTION:
+              HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+                  
metaClient.getActiveTimeline().getInstantDetails(instant).get());
+              metadataWriter.update(rollbackMetadata, instant.getTimestamp());
+              break;
+            default:
+              throw new IllegalStateException("Unexpected value: " + 
instant.getAction());
+          }
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Could not update 
metadata partition for instant: %s", instant), e);
+        } finally {
+          transactionManager.endTransaction(Option.of(instant));
+        }
+      }
+    }
+  }
+
+  private HoodieData<HoodieRecord> readRecordsFromDataFiles(HoodieInstant 
instant) throws IOException {
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true)
+        .build();
+    HoodieTableMetadata metadata = HoodieTableMetadata.create(engineContext, 
metadataConfig, metaClient.getBasePathV2().toString(), false);
+    HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(metaClient, 
metaClient.getActiveTimeline().filter(i -> i.equals(instant)), metadata);
+    // Collect the list of latest base files present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    fsView.loadAllPartitions();
+    final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new 
ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+          .map(basefile -> Pair.of(partition, 
basefile)).collect(Collectors.toList()));
+    }
+
+    return readRecordKeysFromBaseFiles(
+        engineContext,
+        partitionBaseFilePairs,

Review Comment:
   should we be looking at base files alone? this sort of stuff lands us in 
trouble for e.g Flink will add inserts to logs



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import java.util.List;
+import java.util.Set;
+
+public class IndexingCatchupTaskFactory {
+
+  public static BaseIndexingCatchupTask 
createCatchupTask(List<HoodieIndexPartitionInfo> indexPartitionInfos,
+                                                          
HoodieTableMetadataWriter metadataWriter,
+                                                          List<HoodieInstant> 
instantsToIndex,
+                                                          Set<String> 
metadataCompletedInstants,
+                                                          
HoodieTableMetaClient metaClient,
+                                                          
HoodieTableMetaClient metadataMetaClient,
+                                                          String 
currentCaughtupInstant,
+                                                          TransactionManager 
transactionManager,
+                                                          HoodieEngineContext 
engineContext) {
+    boolean isRecordLevelIndexing = indexPartitionInfos.stream()

Review Comment:
   rename: hasRecordLevelIndexing?
   
   We need to rethink all of this for general purpose indexing. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java:
##########
@@ -91,7 +85,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
   private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
   private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = 
INDEX_COMMIT_METADATA_VERSION_1;
   private static final int MAX_CONCURRENT_INDEXING = 1;
-  private static final int TIMELINE_RELOAD_INTERVAL_MILLIS = 5000;
+  static final int TIMELINE_RELOAD_INTERVAL_MILLIS = 5000;

Review Comment:
   if you want it protected. can you move it up before private declarationd?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java:
##########
@@ -280,11 +274,15 @@ private void updateTableConfigAndTimeline(HoodieInstant 
indexInstant,
     }
   }
 
-  private void catchupWithInflightWriters(HoodieTableMetadataWriter 
metadataWriter, List<HoodieInstant> instantsToIndex,
-                                          HoodieTableMetaClient 
metadataMetaClient, Set<String> metadataCompletedTimestamps) {
+  private void catchupWithInflightWriters(HoodieTableMetadataWriter 
metadataWriter,

Review Comment:
   can we leave the formatting alone as we found it? it will help review. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -922,6 +883,19 @@ public void update(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus>
     closeInternal();
   }
 
+  @Override

Review Comment:
   is `records` the entire record from the base file? I doubt not? Is this a 
`HoodieRecord` for the MT instead? 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1694,4 +1696,52 @@ public static HoodieRecordGlobalLocation 
getLocationFromRecordIndexInfo(
     final java.util.Date instantDate = new java.util.Date(instantTime);
     return new HoodieRecordGlobalLocation(partition, 
HoodieActiveTimeline.formatDate(instantDate), fileId);
   }
+
+  /**
+   * Reads the record keys from the base files and returns a {@link 
HoodieData} of {@link HoodieRecord} to be updated in the metadata table.
+   */
+  public static HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,

Review Comment:
   lets UT this method?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/BaseIndexingCatchupTask.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hudi.table.action.index.RunIndexActionExecutor.TIMELINE_RELOAD_INTERVAL_MILLIS;
+
+/**
+ * Indexing check runs for instants that completed after the base instant (in 
the index plan).
+ * It will check if these later instants have logged updates to metadata table 
or not.
+ * If not, then it will do the update. If a later instant is inflight, it will 
wait until it is completed or the task times out.
+ */
+public abstract class BaseIndexingCatchupTask implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseIndexingCatchupTask.class);
+
+  protected final HoodieTableMetadataWriter metadataWriter;
+  protected final List<HoodieInstant> instantsToIndex;
+  protected final Set<String> metadataCompletedInstants;
+  protected final HoodieTableMetaClient metaClient;
+  protected final HoodieTableMetaClient metadataMetaClient;
+  protected final TransactionManager transactionManager;
+  protected final HoodieEngineContext engineContext;
+  protected String currentCaughtupInstant;
+
+  public BaseIndexingCatchupTask(HoodieTableMetadataWriter metadataWriter,
+                                 List<HoodieInstant> instantsToIndex,
+                                 Set<String> metadataCompletedInstants,
+                                 HoodieTableMetaClient metaClient,
+                                 HoodieTableMetaClient metadataMetaClient,
+                                 TransactionManager transactionManager,
+                                 String currentCaughtupInstant,
+                                 HoodieEngineContext engineContext) {
+    this.metadataWriter = metadataWriter;
+    this.instantsToIndex = instantsToIndex;
+    this.metadataCompletedInstants = metadataCompletedInstants;
+    this.metaClient = metaClient;
+    this.metadataMetaClient = metadataMetaClient;
+    this.transactionManager = transactionManager;
+    this.currentCaughtupInstant = currentCaughtupInstant;
+    this.engineContext = engineContext;
+  }
+
+  @Override
+  public abstract void run();
+
+  HoodieInstant getHoodieInstant(HoodieInstant instant) {

Review Comment:
   rename + better docs on what this is computing



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.hudi.table.action.index.RunIndexActionExecutor.TIMELINE_RELOAD_INTERVAL_MILLIS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestIndexingCatchupTask {

Review Comment:
   I wonder if there are more scenarios to test?



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.hudi.table.action.index.RunIndexActionExecutor.TIMELINE_RELOAD_INTERVAL_MILLIS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestIndexingCatchupTask {
+
+  @Mock
+  private HoodieTableMetadataWriter metadataWriter;
+  @Mock
+  private HoodieTableMetaClient metaClient;
+  @Mock
+  private HoodieTableMetaClient metadataMetaClient;
+  @Mock
+  private TransactionManager transactionManager;
+  @Mock
+  private HoodieEngineContext engineContext;
+
+  @BeforeEach
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  /**
+   * Mock out the behavior of the method to mimic a regular successful run
+   */
+  @Test
+  public void testTaskSuccessful() {
+    List<HoodieInstant> instants = Collections.singletonList(new 
HoodieInstant(HoodieInstant.State.REQUESTED, "commit", "001"));
+    Set<String> metadataCompletedInstants = new HashSet<>();
+    BaseIndexingCatchupTask task = new DummyIndexingCatchupTask(
+        metadataWriter,
+        instants,
+        metadataCompletedInstants,
+        metaClient,
+        metadataMetaClient,
+        transactionManager,
+        "001",
+        engineContext);
+
+    task.run();
+    assertEquals("001", task.currentCaughtupInstant);
+  }
+
+  /**
+   * Instant never gets completed, and we interrupt the task to see if it 
throws the expected HoodieIndexException.
+   */
+  @Test
+  public void testTaskInterrupted() {
+    HoodieInstant neverCompletedInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, "commit", "001");
+    HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
+    HoodieActiveTimeline filteredTimeline = mock(HoodieActiveTimeline.class);
+    HoodieActiveTimeline furtherFilteredTimeline = 
mock(HoodieActiveTimeline.class);
+
+    when(metaClient.reloadActiveTimeline()).thenReturn(activeTimeline);
+    
when(activeTimeline.filterCompletedInstants()).thenReturn(filteredTimeline);
+    when(filteredTimeline.filter(any())).thenReturn(furtherFilteredTimeline);
+    AtomicInteger callCount = new AtomicInteger(0);
+    when(furtherFilteredTimeline.firstInstant()).thenAnswer(invocation -> {
+      if (callCount.incrementAndGet() > 3) {
+        throw new InterruptedException("Simulated interruption");
+      }
+      return Option.empty();
+    });
+
+    BaseIndexingCatchupTask task = new DummyIndexingCatchupTask(
+        metadataWriter,
+        Collections.singletonList(neverCompletedInstant),
+        new HashSet<>(),
+        metaClient,
+        metadataMetaClient,
+        transactionManager,
+        "001",
+        engineContext);
+
+    // Interrupt the task after a delay to simulate the negative path
+    Thread thread = new Thread(task);
+    thread.start();
+    try {
+      Thread.sleep(TIMELINE_RELOAD_INTERVAL_MILLIS / 2);

Review Comment:
   can we write this test without sleeps? Could this cause any flakiness due to 
non-determinism?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/HoodieRecordBasedIndexingCatchup.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.RESTORE_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
+
+/**
+ * Indexing catchup task for record level indexing.
+ */
+public class HoodieRecordBasedIndexingCatchup extends BaseIndexingCatchupTask {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordBasedIndexingCatchup.class);
+
+  public HoodieRecordBasedIndexingCatchup(HoodieTableMetadataWriter 
metadataWriter,
+                                          List<HoodieInstant> instantsToIndex,
+                                          Set<String> 
metadataCompletedInstants,
+                                          HoodieTableMetaClient metaClient,
+                                          HoodieTableMetaClient 
metadataMetaClient,
+                                          String currentCaughtupInstant,
+                                          TransactionManager 
transactionManager,
+                                          HoodieEngineContext engineContext) {
+    super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant, 
engineContext);
+  }
+
+  @Override
+  public void run() {
+    for (HoodieInstant instant : instantsToIndex) {
+      // metadata index already updated for this instant
+      instant = getHoodieInstant(instant);
+      if (instant == null) {
+        continue;
+      }
+      // if instant completed, ensure that there was metadata commit, else 
update metadata for this completed instant
+      if (COMPLETED.equals(instant.getState())) {
+        String instantTime = instant.getTimestamp();
+        Option<HoodieInstant> metadataInstant = 
metadataMetaClient.reloadActiveTimeline()
+            .filterCompletedInstants().filter(i -> 
i.getTimestamp().equals(instantTime)).firstInstant();
+        if (metadataInstant.isPresent()) {
+          currentCaughtupInstant = instantTime;
+          continue;
+        }
+        try {
+          // we need take a lock here as inflight writer could also try to 
update the timeline
+          transactionManager.beginTransaction(Option.of(instant), 
Option.empty());

Review Comment:
   there is a lot of duplication between the two Catchup classes? Can we avoid 
that. It almost feels like the record vs commit metadata based decision should 
be an `if` in the first case block, rather than a class hierarchy.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/HoodieCommitMetadataBasedIndexingCatchup.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.RESTORE_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
+
+/**
+ * Indexing catchup task for commit metadata based indexing.
+ */
+public class HoodieCommitMetadataBasedIndexingCatchup extends 
BaseIndexingCatchupTask {

Review Comment:
   Also end with `..CatchupTask` , same as the base class?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/HoodieRecordBasedIndexingCatchup.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.RESTORE_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.readRecordKeysFromBaseFiles;
+
+/**
+ * Indexing catchup task for record level indexing.
+ */
+public class HoodieRecordBasedIndexingCatchup extends BaseIndexingCatchupTask {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieRecordBasedIndexingCatchup.class);
+
+  public HoodieRecordBasedIndexingCatchup(HoodieTableMetadataWriter 
metadataWriter,
+                                          List<HoodieInstant> instantsToIndex,
+                                          Set<String> 
metadataCompletedInstants,
+                                          HoodieTableMetaClient metaClient,
+                                          HoodieTableMetaClient 
metadataMetaClient,
+                                          String currentCaughtupInstant,
+                                          TransactionManager 
transactionManager,
+                                          HoodieEngineContext engineContext) {
+    super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant, 
engineContext);
+  }
+
+  @Override
+  public void run() {
+    for (HoodieInstant instant : instantsToIndex) {
+      // metadata index already updated for this instant
+      instant = getHoodieInstant(instant);
+      if (instant == null) {
+        continue;
+      }
+      // if instant completed, ensure that there was metadata commit, else 
update metadata for this completed instant
+      if (COMPLETED.equals(instant.getState())) {
+        String instantTime = instant.getTimestamp();
+        Option<HoodieInstant> metadataInstant = 
metadataMetaClient.reloadActiveTimeline()
+            .filterCompletedInstants().filter(i -> 
i.getTimestamp().equals(instantTime)).firstInstant();
+        if (metadataInstant.isPresent()) {
+          currentCaughtupInstant = instantTime;
+          continue;
+        }
+        try {
+          // we need take a lock here as inflight writer could also try to 
update the timeline
+          transactionManager.beginTransaction(Option.of(instant), 
Option.empty());
+          LOG.info("Updating metadata table for instant: " + instant);
+          switch (instant.getAction()) {
+            case HoodieTimeline.COMMIT_ACTION:
+            case HoodieTimeline.DELTA_COMMIT_ACTION:
+            case HoodieTimeline.REPLACE_COMMIT_ACTION:
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
metaClient.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+              HoodieData<HoodieRecord> records = 
readRecordsFromDataFiles(instant);
+              metadataWriter.update(commitMetadata, records, 
instant.getTimestamp());
+              break;
+            case CLEAN_ACTION:
+              HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(metaClient, instant);
+              metadataWriter.update(cleanMetadata, instant.getTimestamp());
+              break;
+            case RESTORE_ACTION:
+              HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+                  
metaClient.getActiveTimeline().getInstantDetails(instant).get());
+              metadataWriter.update(restoreMetadata, instant.getTimestamp());
+              break;
+            case ROLLBACK_ACTION:
+              HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+                  
metaClient.getActiveTimeline().getInstantDetails(instant).get());
+              metadataWriter.update(rollbackMetadata, instant.getTimestamp());
+              break;
+            default:
+              throw new IllegalStateException("Unexpected value: " + 
instant.getAction());
+          }
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Could not update 
metadata partition for instant: %s", instant), e);
+        } finally {
+          transactionManager.endTransaction(Option.of(instant));
+        }
+      }
+    }
+  }
+
+  private HoodieData<HoodieRecord> readRecordsFromDataFiles(HoodieInstant 
instant) throws IOException {

Review Comment:
   Base vs Data? Inside the method, we are looking at the base file only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to