This is an automated email from the ASF dual-hosted git repository.
nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 9933215e7 [AMORO-3232] Redefine action and stage for process api
(#3233)
9933215e7 is described below
commit 9933215e7749b224e2ba353386f899d1a421f5c2
Author: majin1102 <[email protected]>
AuthorDate: Mon Oct 14 15:30:25 2024 +0800
[AMORO-3232] Redefine action and stage for process api (#3233)
* Redefine action and stage for process api
* Apply spotless
* Modify IDLE weight as 1
* Add some comments
* Add new comments
* Apply spotless
* Rename RUNNING to EXECUTING
* Move minor, major and full to stages
* Optimize code and comments
---------
Co-authored-by: majin.nathan <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../src/main/java/org/apache/amoro/Action.java | 127 +++++++++++++++------
.../org/apache/amoro/process/OptimizingStage.java | 71 ------------
.../org/apache/amoro/process/OptimizingStages.java | 65 +++++++++++
.../org/apache/amoro/process/OptimizingState.java | 10 +-
.../org/apache/amoro/process/ProcessStage.java | 45 ++++++++
.../apache/amoro/process/TableProcessState.java | 2 +-
6 files changed, 205 insertions(+), 115 deletions(-)
diff --git a/amoro-common/src/main/java/org/apache/amoro/Action.java
b/amoro-common/src/main/java/org/apache/amoro/Action.java
index 879e0c7a8..9f0747350 100644
--- a/amoro-common/src/main/java/org/apache/amoro/Action.java
+++ b/amoro-common/src/main/java/org/apache/amoro/Action.java
@@ -18,62 +18,113 @@
package org.apache.amoro;
-import org.apache.amoro.process.TableProcess;
-import org.apache.amoro.process.TableProcessState;
-import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.thrift.org.apache.commons.lang3.tuple.Pair;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
-public enum Action {
- MINOR_OPTIMIZING("minor-optimizing", 0),
- MAJOR_OPTIMIZING("major-optimizing", 1),
- EXTERNAL_OPTIMIZING("external-optimizing", 2),
- // refresh all metadata including snapshots, watermark, configurations,
schema, etc.
- REFRESH_METADATA("refresh-metadata", 10),
- // expire all metadata and data files necessarily.
- EXPIRE_DATA("expire-data", 11),
- DELETE_ORPHAN_FILES("delete-orphan-files", 12),
- SYNC_HIVE_COMMIT("sync-hive-commit", 13);
+public final class Action {
+ private static final Map<Integer, Action> ACTIONS = Maps.newConcurrentMap();
+
+ private static final TableFormat[] DEFAULT_FORMATS =
+ new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE};
+
+ static {
+ // default optimizing action
+ register(1, 10, "rewrite");
+ // expire all metadata and data files necessarily.
+ register(4, 1, "expire-data");
+ // delete orphan files
+ register(5, 2, "delete-orphans");
+ // sync optimizing commit to hive
+ register(6, 3, "sync-hive");
+ }
+
+ /** supported table formats of this action */
+ private final TableFormat[] formats;
+ /**
+ * storage code of this action, normally this code should be identical
within supported formats
+ */
+ private final int code;
/**
- * Arbitrary actions are actions that can be handled by a single optimizer.
The processes they
- * related to like refreshing, expiring, cleaning and syncing all share the
same basic
- * implementations which are {@link TableProcess} and {@link
TableProcessState} and they won't
- * have any spitted stages like optimizing processes(plan, execute, commit),
so they can be easily
- * triggered and managed. If you want to add a new action which is handled
stand-alone, you should
- * add it to this set, and you would find it's easy to implement the process
and state.
+ * the weight number of this action, the bigger the weight number, the
higher positions of
+ * schedulers or front pages
*/
- public static final Set<Action> ARBITRARY_ACTIONS =
- Collections.unmodifiableSet(
- Sets.newHashSet(REFRESH_METADATA, EXPIRE_DATA, DELETE_ORPHAN_FILES,
SYNC_HIVE_COMMIT));
+ private final int weight;
+ /** description of this action, will be shown in front pages */
+ private final String desc;
- public static boolean isArbitrary(Action action) {
- return ARBITRARY_ACTIONS.contains(action);
+ private Action(TableFormat[] formats, int code, int weight, String desc) {
+ this.formats = formats;
+ this.code = code;
+ this.desc = desc;
+ this.weight = weight;
}
- private final String description;
- private final int code;
+ public static Action valueOf(int code) {
+ return ACTIONS.get(code);
+ }
- Action(String description, int dbValue) {
- this.description = description;
- this.code = dbValue;
+ public static synchronized void register(int code, int weight, String desc) {
+ register(DEFAULT_FORMATS, code, weight, desc);
}
- public String getDescription() {
- return description;
+ public static synchronized void register(
+ TableFormat[] formats, int code, int weight, String desc) {
+ Map<TableFormat, Set<String>> format2Actions = buildMapFromActions();
+ for (TableFormat format : formats) {
+ if (format2Actions.get(format).contains(desc)) {
+ throw new IllegalArgumentException("Duplicated action: " + desc + " in
format: " + format);
+ }
+ }
+ if (ACTIONS.put(code, new Action(formats, code, weight, desc)) != null) {
+ throw new IllegalArgumentException("Duplicated action code: " + code);
+ }
+ }
+
+ private static Map<TableFormat, Set<String>> buildMapFromActions() {
+ return ACTIONS.values().stream()
+ .flatMap(
+ action -> Arrays.stream(action.formats).map(format ->
Pair.of(format, action.desc)))
+ .collect(
+ Collectors.groupingBy(
+ Pair::getKey, Collectors.mapping(Pair::getValue,
Collectors.toSet())));
}
public int getCode() {
return code;
}
- public static Action of(int code) {
- for (Action action : Action.values()) {
- if (action.code == code) {
- return action;
- }
+ public String getDesc() {
+ return desc;
+ }
+
+ public int getWeight() {
+ return weight;
+ }
+
+ public TableFormat[] supportedFormats() {
+ return formats;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
}
- throw new IllegalArgumentException("No action with code: " + code);
+ Action action = (Action) o;
+ return code == action.code;
+ }
+
+ @Override
+ public int hashCode() {
+ return code;
}
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStage.java
b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStage.java
deleted file mode 100644
index 81d5b5274..000000000
--- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.process;
-
-/** The stage of the optimizing process. */
-public enum OptimizingStage {
-
- /** Full optimizing executing phase */
- FULL_OPTIMIZING("full", true),
-
- /** Major optimizing executing phase */
- MAJOR_OPTIMIZING("major", true),
-
- /** Minor optimizing executing phase */
- MINOR_OPTIMIZING("minor", true),
-
- /** Committing phase of optimizing */
- COMMITTING("committing", true),
-
- /** Planning phase of optimizing */
- PLANNING("planning", false),
-
- /** When input data has been collected but waiting for quota available(not
scheduled yet) */
- PENDING("pending", false),
-
- /** When waiting for input data */
- IDLE("idle", false),
-
- /** When the process has been scheduled but being waiting for quota
available */
- SUSPENDING("suspending", false),
-
- /** Mainly for external process submitting to external resources */
- SUBMITTING("submitting", false);
-
- /** The display description of the stage. */
- private final String displayValue;
-
- /*
- * Whether the stage is an optimizing executing stage.
- */
- private final boolean isOptimizing;
-
- OptimizingStage(String displayValue, boolean isProcessing) {
- this.displayValue = displayValue;
- this.isOptimizing = isProcessing;
- }
-
- public boolean isOptimizing() {
- return isOptimizing;
- }
-
- public String displayValue() {
- return displayValue;
- }
-}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStages.java
b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStages.java
new file mode 100644
index 000000000..6049621cd
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingStages.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** The stage of the optimizing process. */
+public class OptimizingStages {
+
+ /** minor optimizing executing phase */
+ public static final ProcessStage MINOR = new ProcessStage("minor", 13);
+ /** major optimizing executing phase */
+ public static final ProcessStage MAJOR = new ProcessStage("major", 14);
+ /** full optimizing executing phase */
+ public static final ProcessStage FULL = new ProcessStage("full", 15);
+ /** Committing phase of optimizing */
+ public static final ProcessStage COMMITTING = new ProcessStage("committing",
18);
+ /** Planning phase of optimizing */
+ public static final ProcessStage PLANNING = new ProcessStage("planning", 17);
+ /** evaluating phase of optimizing */
+ public static final ProcessStage EVALUATING = new ProcessStage("evaluating",
16);
+ /** When input data has been collected but waiting for quota available(not
scheduled yet) */
+ public static final ProcessStage PENDING = new ProcessStage("pending", 9);
+ /** When the process has been scheduled but being waiting for quota
available */
+ public static final ProcessStage SUSPENDING = new ProcessStage("suspending",
9);
+ /** When waiting for input data */
+ public static final ProcessStage IDLE = new ProcessStage("idle", 1);
+
+ private static final Map<String, ProcessStage> STAGES =
+ ImmutableMap.<String, ProcessStage>builder()
+ .put(MINOR.getDesc(), MINOR)
+ .put(MINOR.getDesc(), MAJOR)
+ .put(FULL.getDesc(), FULL)
+ .put(COMMITTING.getDesc(), COMMITTING)
+ .put(PLANNING.getDesc(), PLANNING)
+ .put(EVALUATING.getDesc(), EVALUATING)
+ .put(PENDING.getDesc(), PENDING)
+ .put(SUSPENDING.getDesc(), SUSPENDING)
+ .put(IDLE.getDesc(), IDLE)
+ .build();
+
+ public static ProcessStage of(String desc) {
+ return Optional.ofNullable(STAGES.get(desc))
+ .orElseThrow(() -> new IllegalArgumentException("No optimizing stage
with desc: " + desc));
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
index 5e9a0cbe6..99691cf84 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/OptimizingState.java
@@ -27,7 +27,7 @@ public abstract class OptimizingState extends
TableProcessState {
@StateField private volatile long targetSnapshotId;
@StateField private volatile long watermark;
- @StateField private volatile OptimizingStage stage;
+ @StateField private volatile ProcessStage stage;
@StateField private volatile long currentStageStartTime;
public OptimizingState(Action action, ServerTableIdentifier tableIdentifier)
{
@@ -38,12 +38,12 @@ public abstract class OptimizingState extends
TableProcessState {
super(id, action, tableIdentifier);
}
- protected void setStage(OptimizingStage stage) {
+ protected void setStage(ProcessStage stage) {
this.stage = stage;
this.currentStageStartTime = System.currentTimeMillis();
}
- protected void setStage(OptimizingStage stage, long stageStartTime) {
+ protected void setStage(ProcessStage stage, long stageStartTime) {
this.stage = stage;
this.currentStageStartTime = stageStartTime;
}
@@ -60,7 +60,7 @@ public abstract class OptimizingState extends
TableProcessState {
return watermark;
}
- public OptimizingStage getStage() {
+ public ProcessStage getStage() {
return stage;
}
@@ -74,6 +74,6 @@ public abstract class OptimizingState extends
TableProcessState {
@Override
public String getName() {
- return stage.displayValue();
+ return stage.getDesc();
}
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessStage.java
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessStage.java
new file mode 100644
index 000000000..47be213ce
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessStage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+public class ProcessStage {
+
+ /**
+ * Action Stage description value, normally this value should be identical
within certain actions
+ */
+ private final String desc;
+ /**
+ * the weight number of this action, the bigger the weight number, the
higher position on front
+ * pages
+ */
+ private final int weight;
+
+ public ProcessStage(String desc, int weight) {
+ this.desc = desc;
+ this.weight = weight;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public int getWeight() {
+ return weight;
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
index 024484ddc..c58a6f4d0 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/TableProcessState.java
@@ -53,7 +53,7 @@ public class TableProcessState implements ProcessState {
}
public String getName() {
- return action.getDescription();
+ return action.getDesc();
}
public Action getAction() {