This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 38e5a35e76e9def6acfa1db5fa2858ecedccc867
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Mon Dec 12 07:28:22 2022 -0800

    [HUDI-5078] Fixing isTableService for replace commits (#7037)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 .../java/org/apache/hudi/table/HoodieTable.java    | 19 ++++++++++++++++--
 .../hudi/table/action/BaseActionExecutor.java      |  2 +-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  2 +-
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  6 ------
 .../hudi/table/HoodieFlinkMergeOnReadTable.java    |  6 ------
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |  6 ------
 .../hudi/table/HoodieJavaMergeOnReadTable.java     |  6 ------
 .../apache/hudi/client/SparkRDDWriteClient.java    |  2 +-
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  5 -----
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |  5 -----
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 23 ++++++++++++++++++++++
 .../hudi/common/table/timeline/HoodieInstant.java  |  2 +-
 .../table/timeline/TestHoodieActiveTimeline.java   |  2 +-
 14 files changed, 46 insertions(+), 42 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 609f85e27fe..011235e4369 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -348,7 +348,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
   protected void writeTableMetadata(HoodieTable table, String instantTime, 
String actionType, HoodieCommitMetadata metadata) {
     context.setJobStatus(this.getClass().getSimpleName(), "Committing to 
metadata table: " + config.getTableName());
     table.getMetadataWriter(instantTime).ifPresent(w -> 
((HoodieTableMetadataWriter) w).update(metadata, instantTime,
-        table.isTableServiceAction(actionType)));
+        table.isTableServiceAction(actionType, instantTime)));
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 77b24a38953..7a9211a7c57 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -58,6 +59,7 @@ import 
org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -823,10 +825,23 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
 
   /**
    * Check if action type is a table service.
-   * @param actionType action type of interest.
+   * @param actionType action type of the instant
+   * @param instantTime instant time of the instant.
    * @return true if action represents a table service. false otherwise.
    */
-  public abstract boolean isTableServiceAction(String actionType);
+  public boolean isTableServiceAction(String actionType, String instantTime) {
+    if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+      Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = 
ClusteringUtils.getClusteringPlan(metaClient, new 
HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime));
+      // only clustering is table service with replace commit action
+      return instantPlan.isPresent();
+    } else {
+      if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
+        return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
+      } else {
+        return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
+      }
+    }
+  }
 
   /**
    * Get Table metadata writer.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index f893b4ccd5c..d2b2cef2f60 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -58,7 +58,7 @@ public abstract class BaseActionExecutor<T extends 
HoodieRecordPayload, I, K, O,
    */
   protected final void writeTableMetadata(HoodieCommitMetadata metadata, 
String actionType) {
     table.getMetadataWriter(instantTime).ifPresent(w -> w.update(
-        metadata, instantTime, table.isTableServiceAction(actionType)));
+        metadata, instantTime, table.isTableServiceAction(actionType, 
instantTime)));
   }
 
   /**
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 33148cddb11..b4003712c56 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -289,7 +289,7 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     // the schema expects to be immutable for SQL jobs but may be not for 
non-SQL
     // jobs.
     this.metadataWriter.initTableMetadata();
-    this.metadataWriter.update(metadata, instantTime, 
getHoodieTable().isTableServiceAction(actionType));
+    this.metadataWriter.update(metadata, instantTime, 
getHoodieTable().isTableServiceAction(actionType, instantTime));
   }
 
   /**
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 2c8a3c4e49c..543751a0410 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
@@ -93,11 +92,6 @@ public class HoodieFlinkCopyOnWriteTable<T extends 
HoodieRecordPayload>
     super(config, context, metaClient);
   }
 
-  @Override
-  public boolean isTableServiceAction(String actionType) {
-    return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
-  }
-
   /**
    * Upsert a batch of new records into Hoodie table at the supplied 
instantTime.
    *
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index aa8adde7353..9b7d3447177 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -58,11 +57,6 @@ public class HoodieFlinkMergeOnReadTable<T extends 
HoodieRecordPayload>
     super(config, context, metaClient);
   }
 
-  @Override
-  public boolean isTableServiceAction(String actionType) {
-    return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
-  }
-
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> upsert(
       HoodieEngineContext context,
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 88921334980..342c018e5a2 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -90,11 +89,6 @@ public class HoodieJavaCopyOnWriteTable<T extends 
HoodieRecordPayload>
     super(config, context, metaClient);
   }
 
-  @Override
-  public boolean isTableServiceAction(String actionType) {
-    return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
-  }
-
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext 
context,
                                                        String instantTime,
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index 32d30f704ec..5af29502a95 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -43,11 +42,6 @@ public class HoodieJavaMergeOnReadTable<T extends 
HoodieRecordPayload> extends H
     super(config, context, metaClient);
   }
 
-  @Override
-  public boolean isTableServiceAction(String actionType) {
-    return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
-  }
-
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> 
upsertPrepped(HoodieEngineContext context,
                                                               String 
instantTime,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index ea7a660c379..fa568a7a6fe 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -425,7 +425,7 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
 
   private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata 
commitMetadata,
                                    HoodieInstant hoodieInstant) {
-    boolean isTableServiceAction = 
table.isTableServiceAction(hoodieInstant.getAction());
+    boolean isTableServiceAction = 
table.isTableServiceAction(hoodieInstant.getAction(), 
hoodieInstant.getTimestamp());
     // Do not do any conflict resolution here as we do with regular writes. We 
take the lock here to ensure all writes to metadata table happens within a
     // single lock (single writer). Because more than one write to metadata 
table will result in conflicts since all of them updates the same partition.
     table.getMetadataWriter(hoodieInstant.getTimestamp())
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index a88ca65c35a..743aff51a12 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -103,11 +103,6 @@ public class HoodieSparkCopyOnWriteTable<T extends 
HoodieRecordPayload>
     super(config, context, metaClient);
   }
 
-  @Override
-  public boolean isTableServiceAction(String actionType) {
-    return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
-  }
-
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> 
upsert(HoodieEngineContext context, String instantTime, 
HoodieData<HoodieRecord<T>> records) {
     return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) 
context, config, this, instantTime, records).execute();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index efc667af297..c9d74246319 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -79,11 +79,6 @@ public class HoodieSparkMergeOnReadTable<T extends 
HoodieRecordPayload> extends
     super(config, context, metaClient);
   }
 
-  @Override
-  public boolean isTableServiceAction(String actionType) {
-    return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
-  }
-
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> 
upsert(HoodieEngineContext context, String instantTime, 
HoodieData<HoodieRecord<T>> records) {
     return new 
SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, 
config, this, instantTime, records).execute();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 76b51e6970b..72aba1f1638 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -281,6 +281,29 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     }
   }
 
+  @Test
+  public void testArchiveTableWithReplaceCommits() throws Exception {
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 
2);
+    for (int i = 1; i < 7; i++) {
+      if (i < 3) {
+        testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i 
== 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
+            Arrays.asList("p1", "p2"), 2);
+      } else {
+        testTable.doWriteOperation("0000000" + i, 
WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), 
Arrays.asList("p1", "p2"), 2);
+      }
+      // trigger archival
+      Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = 
archiveAndGetCommitsList(writeConfig);
+      List<HoodieInstant> originalCommits = commitsList.getKey();
+      List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+
+      if (i == 6) {
+        // after all rounds, only 3 should be left in active timeline. 4,5,6
+        assertEquals(originalCommits, commitsAfterArchival);
+        assertEquals(3, originalCommits.size());
+      }
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws 
Exception {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 8b1cb875c09..bd29e2d6a2f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -68,7 +68,7 @@ public class HoodieInstant implements Serializable, 
Comparable<HoodieInstant> {
     // Committed instant
     COMPLETED,
     // Invalid instant
-    INVALID
+    NIL
   }
 
   private State state = State.COMPLETED;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index e40341a2033..b418c89f949 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -663,7 +663,7 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     List<HoodieInstant> allInstants = new ArrayList<>();
     long instantTime = 1;
     for (State state : State.values()) {
-      if (state == State.INVALID) {
+      if (state == State.NIL) {
         continue;
       }
       for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {

Reply via email to