This is an automated email from the ASF dual-hosted git repository.

nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 9933215e7  [AMORO-3232] Redefine action and stage for process api 
(#3233)
9933215e7 is described below

commit 9933215e7749b224e2ba353386f899d1a421f5c2
Author: majin1102 <[email protected]>
AuthorDate: Mon Oct 14 15:30:25 2024 +0800

     [AMORO-3232] Redefine action and stage for process api (#3233)
    
    * Redefine action and stage for process api
    
    * Apply spotless
    
    * Modify IDLE weight as 1
    
    * Add some comments
    
    * Add new comments
    
    * Apply spotless
    
    * Rename RUNNING to EXECUTING
    
    * Move minor, major and  full to stages
    
    * Optimize code and comments
    
    ---------
    
    Co-authored-by: majin.nathan <[email protected]>
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../src/main/java/org/apache/amoro/Action.java     | 127 +++++++++++++++------
 .../org/apache/amoro/process/OptimizingStage.java  |  71 ------------
 .../org/apache/amoro/process/OptimizingStages.java |  65 +++++++++++
 .../org/apache/amoro/process/OptimizingState.java  |  10 +-
 .../org/apache/amoro/process/ProcessStage.java     |  45 ++++++++
 .../apache/amoro/process/TableProcessState.java    |   2 +-
 6 files changed, 205 insertions(+), 115 deletions(-)

diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java 
b/amoro-common/src/main/java/org/apache/amoro/Action.java
index 879e0c7a8..9f0747350 100644
--- a/amoro-common/src/main/java/org/apache/amoro/Action.java
+++ b/amoro-common/src/main/java/org/apache/amoro/Action.java
@@ -18,62 +18,113 @@
 
 package org.apache.amoro;
 
-import org.apache.amoro.process.TableProcess;
-import org.apache.amoro.process.TableProcessState;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.thrift.org.apache.commons.lang3.tuple.Pair;
 
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
-public enum Action {
-  MINOR_OPTIMIZING("minor-optimizing", 0),
-  MAJOR_OPTIMIZING("major-optimizing", 1),
-  EXTERNAL_OPTIMIZING("external-optimizing", 2),
-  // refresh all metadata including snapshots, watermark, configurations, 
schema, etc.
-  REFRESH_METADATA("refresh-metadata", 10),
-  // expire all metadata and data files necessarily.
-  EXPIRE_DATA("expire-data", 11),
-  DELETE_ORPHAN_FILES("delete-orphan-files", 12),
-  SYNC_HIVE_COMMIT("sync-hive-commit", 13);
+public final class Action {
 
+  private static final Map<Integer, Action> ACTIONS = Maps.newConcurrentMap();
+
+  private static final TableFormat[] DEFAULT_FORMATS =
+      new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, 
TableFormat.MIXED_HIVE};
+
+  static {
+    // default optimizing action
+    register(1, 10, "rewrite");
+    // expire all metadata and data files necessarily.
+    register(4, 1, "expire-data");
+    // delete orphan files
+    register(5, 2, "delete-orphans");
+    // sync optimizing commit to hive
+    register(6, 3, "sync-hive");
+  }
+
+  /** supported table formats of this action */
+  private final TableFormat[] formats;
+  /**
+   * storage code of this action, normally this code should be identical 
within supported formats
+   */
+  private final int code;
   /**
-   * Arbitrary actions are actions that can be handled by a single optimizer. 
The processes they
-   * related to like refreshing, expiring, cleaning and syncing all share the 
same basic
-   * implementations which are {@link TableProcess} and {@link 
TableProcessState} and they won't
-   * have any spitted stages like optimizing processes(plan, execute, commit), 
so they can be easily
-   * triggered and managed. If you want to add a new action which is handled 
stand-alone, you should
-   * add it to this set, and you would find it's easy to implement the process 
and state.
+   * the weight number of this action, the bigger the weight number, the 
higher positions of
+   * schedulers or front pages
    */
-  public static final Set<Action> ARBITRARY_ACTIONS =
-      Collections.unmodifiableSet(
-          Sets.newHashSet(REFRESH_METADATA, EXPIRE_DATA, DELETE_ORPHAN_FILES, 
SYNC_HIVE_COMMIT));
+  private final int weight;
+  /** description of this action, will be shown in front pages */
+  private final String desc;
 
-  public static boolean isArbitrary(Action action) {
-    return ARBITRARY_ACTIONS.contains(action);
+  private Action(TableFormat[] formats, int code, int weight, String desc) {
+    this.formats = formats;
+    this.code = code;
+    this.desc = desc;
+    this.weight = weight;
   }
 
-  private final String description;
-  private final int code;
+  public static Action valueOf(int code) {
+    return ACTIONS.get(code);
+  }
 
-  Action(String description, int dbValue) {
-    this.description = description;
-    this.code = dbValue;
+  public static synchronized void register(int code, int weight, String desc) {
+    register(DEFAULT_FORMATS, code, weight, desc);
   }
 
-  public String getDescription() {
-    return description;
+  public static synchronized void register(
+      TableFormat[] formats, int code, int weight, String desc) {
+    Map<TableFormat, Set<String>> format2Actions = buildMapFromActions();
+    for (TableFormat format : formats) {
+      if (format2Actions.get(format).contains(desc)) {
+        throw new IllegalArgumentException("Duplicated action: " + desc + " in 
format: " + format);
+      }
+    }
+    if (ACTIONS.put(code, new Action(formats, code, weight, desc)) != null) {
+      throw new IllegalArgumentException("Duplicated action code: " + code);
+    }
+  }
+
+  private static Map<TableFormat, Set<String>> buildMapFromActions() {
+    return ACTIONS.values().stream()
+        .flatMap(
+            action -> Arrays.stream(action.formats).map(format -> 
Pair.of(format, action.desc)))
+        .collect(
+            Collectors.groupingBy(
+                Pair::getKey, Collectors.mapping(Pair::getValue, 
Collectors.toSet())));
   }
 
   public int getCode() {
     return code;
   }
 
-  public static Action of(int code) {
-    for (Action action : Action.values()) {
-      if (action.code == code) {
-        return action;
-      }
+  public String getDesc() {
+    return desc;
+  }
+
+  public int getWeight() {
+    return weight;
+  }
+
+  public TableFormat[] supportedFormats() {
+    return formats;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
     }
-    throw new IllegalArgumentException("No action with code: " + code);
+    Action action = (Action) o;
+    return code == action.code;
+  }
+
+  @Override
+  public int hashCode() {
+    return code;
   }
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStage.java 
b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStage.java
deleted file mode 100644
index 81d5b5274..000000000
--- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.process;
-
-/** The stage of the optimizing process. */
-public enum OptimizingStage {
-
-  /** Full optimizing executing phase */
-  FULL_OPTIMIZING("full", true),
-
-  /** Major optimizing executing phase */
-  MAJOR_OPTIMIZING("major", true),
-
-  /** Minor optimizing executing phase */
-  MINOR_OPTIMIZING("minor", true),
-
-  /** Committing phase of optimizing */
-  COMMITTING("committing", true),
-
-  /** Planning phase of optimizing */
-  PLANNING("planning", false),
-
-  /** When input data has been collected but waiting for quota available(not 
scheduled yet) */
-  PENDING("pending", false),
-
-  /** When waiting for input data */
-  IDLE("idle", false),
-
-  /** When the process has been scheduled but being waiting for quota 
available */
-  SUSPENDING("suspending", false),
-
-  /** Mainly for external process submitting to external resources */
-  SUBMITTING("submitting", false);
-
-  /** The display description of the stage. */
-  private final String displayValue;
-
-  /*
-   * Whether the stage is an optimizing executing stage.
-   */
-  private final boolean isOptimizing;
-
-  OptimizingStage(String displayValue, boolean isProcessing) {
-    this.displayValue = displayValue;
-    this.isOptimizing = isProcessing;
-  }
-
-  public boolean isOptimizing() {
-    return isOptimizing;
-  }
-
-  public String displayValue() {
-    return displayValue;
-  }
-}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStages.java 
b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStages.java
new file mode 100644
index 000000000..6049621cd
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStages.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** The stage of the optimizing process. */
+public class OptimizingStages {
+
+  /** minor optimizing executing phase */
+  public static final ProcessStage MINOR = new ProcessStage("minor", 13);
+  /** major optimizing executing phase */
+  public static final ProcessStage MAJOR = new ProcessStage("major", 14);
+  /** full optimizing executing phase */
+  public static final ProcessStage FULL = new ProcessStage("full", 15);
+  /** Committing phase of optimizing */
+  public static final ProcessStage COMMITTING = new ProcessStage("committing", 
18);
+  /** Planning phase of optimizing */
+  public static final ProcessStage PLANNING = new ProcessStage("planning", 17);
+  /** evaluating phase of optimizing */
+  public static final ProcessStage EVALUATING = new ProcessStage("evaluating", 
16);
+  /** When input data has been collected but waiting for quota available(not 
scheduled yet) */
+  public static final ProcessStage PENDING = new ProcessStage("pending", 9);
+  /** When the process has been scheduled but being waiting for quota 
available */
+  public static final ProcessStage SUSPENDING = new ProcessStage("suspending", 
9);
+  /** When waiting for input data */
+  public static final ProcessStage IDLE = new ProcessStage("idle", 1);
+
+  private static final Map<String, ProcessStage> STAGES =
+      ImmutableMap.<String, ProcessStage>builder()
+          .put(MINOR.getDesc(), MINOR)
+          .put(MINOR.getDesc(), MAJOR)
+          .put(FULL.getDesc(), FULL)
+          .put(COMMITTING.getDesc(), COMMITTING)
+          .put(PLANNING.getDesc(), PLANNING)
+          .put(EVALUATING.getDesc(), EVALUATING)
+          .put(PENDING.getDesc(), PENDING)
+          .put(SUSPENDING.getDesc(), SUSPENDING)
+          .put(IDLE.getDesc(), IDLE)
+          .build();
+
+  public static ProcessStage of(String desc) {
+    return Optional.ofNullable(STAGES.get(desc))
+        .orElseThrow(() -> new IllegalArgumentException("No optimizing stage 
with desc: " + desc));
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java 
b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
index 5e9a0cbe6..99691cf84 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
@@ -27,7 +27,7 @@ public abstract class OptimizingState extends 
TableProcessState {
 
   @StateField private volatile long targetSnapshotId;
   @StateField private volatile long watermark;
-  @StateField private volatile OptimizingStage stage;
+  @StateField private volatile ProcessStage stage;
   @StateField private volatile long currentStageStartTime;
 
   public OptimizingState(Action action, ServerTableIdentifier tableIdentifier) 
{
@@ -38,12 +38,12 @@ public abstract class OptimizingState extends 
TableProcessState {
     super(id, action, tableIdentifier);
   }
 
-  protected void setStage(OptimizingStage stage) {
+  protected void setStage(ProcessStage stage) {
     this.stage = stage;
     this.currentStageStartTime = System.currentTimeMillis();
   }
 
-  protected void setStage(OptimizingStage stage, long stageStartTime) {
+  protected void setStage(ProcessStage stage, long stageStartTime) {
     this.stage = stage;
     this.currentStageStartTime = stageStartTime;
   }
@@ -60,7 +60,7 @@ public abstract class OptimizingState extends 
TableProcessState {
     return watermark;
   }
 
-  public OptimizingStage getStage() {
+  public ProcessStage getStage() {
     return stage;
   }
 
@@ -74,6 +74,6 @@ public abstract class OptimizingState extends 
TableProcessState {
 
   @Override
   public String getName() {
-    return stage.displayValue();
+    return stage.getDesc();
   }
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessStage.java 
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessStage.java
new file mode 100644
index 000000000..47be213ce
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessStage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+public class ProcessStage {
+
+  /**
+   * Action Stage description value, normally this value should be identical 
within certain actions
+   */
+  private final String desc;
+  /**
+   * the weight number of this action, the bigger the weight number, the 
higher position on front
+   * pages
+   */
+  private final int weight;
+
+  public ProcessStage(String desc, int weight) {
+    this.desc = desc;
+    this.weight = weight;
+  }
+
+  public String getDesc() {
+    return desc;
+  }
+
+  public int getWeight() {
+    return weight;
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java 
b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
index 024484ddc..c58a6f4d0 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
@@ -53,7 +53,7 @@ public class TableProcessState implements ProcessState {
   }
 
   public String getName() {
-    return action.getDescription();
+    return action.getDesc();
   }
 
   public Action getAction() {

Reply via email to