This is an automated email from the ASF dual-hosted git repository.
baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 55fb785c9 [Improvement]: Extract TableProcessStore from AmoroProcess
interface. (#4116)
55fb785c9 is described below
commit 55fb785c94c4ff95d06b477bb2194a91c6171c25
Author: baiyangtx <[email protected]>
AuthorDate: Thu Mar 12 21:40:45 2026 +0800
[Improvement]: Extract TableProcessStore from AmoroProcess interface.
(#4116)
* refactor(process): extract process framework changes
* CI
* Update
amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
Co-authored-by: Xu Bai <[email protected]>
* Update
amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
Co-authored-by: Xu Bai <[email protected]>
* Fix comments
---------
Co-authored-by: zhangyongxiang.alpha <[email protected]>
Co-authored-by: Aime <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
Co-authored-by: Xu Bai <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 3 +
.../server/process/ActionCoordinatorScheduler.java | 10 +-
.../server/process/DefaultTableProcessStore.java | 1 +
.../amoro/server/process/ProcessService.java | 241 ++++++++++-----------
.../amoro/server/process/TableProcessMeta.java | 21 ++
.../process/executor/TableProcessExecutor.java | 125 +++++------
.../server/scheduler/PeriodicTableScheduler.java | 2 +
.../amoro/server/table/DefaultTableRuntime.java | 11 +-
.../server/table/DefaultTableRuntimeFactory.java | 11 +-
.../amoro/server/table/DefaultTableService.java | 1 +
.../amoro/server/TestDefaultProcessService.java | 68 +++---
.../server/process/MockActionCoordinator.java | 25 +--
.../amoro/server/process/MockExecuteEngine.java | 7 +-
.../amoro/server/process/MockTableProcess.java | 55 +++--
.../inline/TestPeriodicTableSchedulerCleanup.java | 2 +-
.../inline/TestTableRuntimeRefreshExecutor.java | 2 +-
.../amoro/server/table/TestTableManagerQuery.java | 2 +-
.../main/java/org/apache/amoro/TableRuntime.java | 7 +
.../org/apache/amoro/process/AmoroProcess.java | 47 +---
.../java/org/apache/amoro/process}/EngineType.java | 2 +-
.../org/apache/amoro/process}/ExecuteEngine.java | 4 +-
.../org/apache/amoro/process/ProcessFactory.java | 8 +
.../org/apache/amoro/process/TableProcess.java | 52 ++---
.../apache/amoro/table/TableRuntimeFactory.java | 4 +
24 files changed, 363 insertions(+), 348 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 19bf7be98..25de6a7af 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -236,6 +236,7 @@ public class AmoroServiceContainer {
}
public void startOptimizingService() throws Exception {
+
// Load process factories and build action coordinators from default table
runtime factory.
TableProcessFactoryManager tableProcessFactoryManager = new
TableProcessFactoryManager();
tableProcessFactoryManager.initialize();
@@ -246,6 +247,8 @@ public class AmoroServiceContainer {
List<ActionCoordinator> actionCoordinators =
defaultRuntimeFactory.supportedCoordinators();
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
+ processFactories.forEach(
+ c ->
c.availableExecuteEngines(executeEngineManager.installedPlugins()));
tableService = new DefaultTableService(serviceConfig, catalogManager,
defaultRuntimeFactory);
processService = new ProcessService(tableService, actionCoordinators,
executeEngineManager);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
index daf586941..4e9f79a35 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
@@ -108,16 +108,10 @@ public class ActionCoordinatorScheduler extends
PeriodicTableScheduler {
* @param tableRuntime table runtime
* @param processStore process store
*/
- protected void recover(TableRuntime tableRuntime, TableProcessStore
processStore) {
- TableProcess process = coordinator.recoverTableProcess(tableRuntime,
processStore);
- processService.recover(tableRuntime, process);
+ protected TableProcess recover(TableRuntime tableRuntime, TableProcessStore
processStore) {
+ return coordinator.recoverTableProcess(tableRuntime, processStore);
}
- /**
- * Get executor delay from coordinator.
- *
- * @return delay in milliseconds
- */
@Override
protected long getExecutorDelay() {
return coordinator.getExecutorDelay();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java
index 8a3dc6211..a57d1db21 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java
@@ -63,6 +63,7 @@ public class DefaultTableProcessStore extends PersistentBase
implements TablePro
* @param action action type
*/
public DefaultTableProcessStore(TableRuntime tableRuntime, TableProcessMeta
meta, Action action) {
+ this.processId = meta.getProcessId();
this.meta = meta;
this.tableRuntime = tableRuntime;
this.action = action;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index 79881c61b..01944e8a2 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -25,15 +25,15 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.ProcessEvent;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.manager.AbstractPluginManager;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
-import org.apache.amoro.server.process.executor.EngineType;
-import org.apache.amoro.server.process.executor.ExecuteEngine;
import org.apache.amoro.server.process.executor.TableProcessExecutor;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
@@ -61,7 +61,7 @@ public class ProcessService extends PersistentBase {
private final Map<String, ActionCoordinatorScheduler> actionCoordinators =
new ConcurrentHashMap<>();
- private final Map<EngineType, ExecuteEngine> executeEngines = new
ConcurrentHashMap<>();
+ private final Map<String, ExecuteEngine> executeEngines = new
ConcurrentHashMap<>();
private final ExecuteEngineManager executeEngineManager;
private final List<ActionCoordinator> actionCoordinatorList;
@@ -69,7 +69,7 @@ public class ProcessService extends PersistentBase {
private final ThreadPoolExecutor processExecutionPool =
new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<>());
- private final Map<ServerTableIdentifier, Map<Long, TableProcess>>
activeTableProcess =
+ private final Map<ServerTableIdentifier, Map<Long, TableProcessHolder>>
activeTableProcess =
new ConcurrentHashMap<>();
public ProcessService(TableService tableService) {
@@ -108,40 +108,9 @@ public class ProcessService extends PersistentBase {
process.getAction());
return;
}
- persistTableProcess(process);
- trackTableProcess(tableRuntime.getTableIdentifier(), process);
- executeOrTraceProcess(process);
- }
-
- /**
- * Recover a table process for a given table runtime.
- *
- * @param tableRuntime table runtime
- * @param process table process to recover
- */
- public void recover(TableRuntime tableRuntime, TableProcess process) {
- // TODO: init some status
- trackTableProcess(tableRuntime.getTableIdentifier(), process);
- executeOrTraceProcess(process);
- }
-
- /**
- * Retry a failed table process.
- *
- * @param process process to retry
- */
- public void retry(TableProcess process) {
- executeOrTraceProcess(process);
- }
-
- /**
- * Cancel a table process and release related resources.
- *
- * @param process process to cancel
- */
- public void cancel(TableProcess process) {
- // TODO: init some status
- cancelProcess(process);
+ TableProcessStore store = persistTableProcess(process);
+ trackTableProcess(tableRuntime.getTableIdentifier(), store, process);
+ executeOrTraceProcess(store, process);
}
/** Dispose the service, shutdown engines and clear active processes. */
@@ -164,7 +133,7 @@ public class ProcessService extends PersistentBase {
.installedPlugins()
.forEach(
executeEngine -> {
- executeEngines.put(executeEngine.engineType(), executeEngine);
+ executeEngines.put(executeEngine.name(), executeEngine);
});
recoverProcesses(tableRuntimes);
actionCoordinators.values().forEach(s -> s.initialize(tableRuntimes));
@@ -188,14 +157,16 @@ public class ProcessService extends PersistentBase {
ActionCoordinatorScheduler scheduler =
actionCoordinators.get(processMeta.getProcessType());
if (tableRuntime != null && scheduler != null) {
- scheduler.recover(
- tableRuntime,
+ DefaultTableProcessStore store =
new DefaultTableProcessStore(
processMeta.getProcessId(),
tableRuntime,
processMeta,
scheduler.getAction(),
- scheduler.PROCESS_MAX_RETRY_NUMBER));
+ processMeta.getRetryNumber());
+ TableProcess process = scheduler.recover(tableRuntime, store);
+ trackTableProcess(tableRuntime.getTableIdentifier(), store,
process);
+ executeOrTraceProcess(store, process);
}
});
}
@@ -205,41 +176,45 @@ public class ProcessService extends PersistentBase {
*
* @param process table process
*/
- private void executeOrTraceProcess(TableProcess process) {
-
- if (!isExecutable(process)) {
+ private void executeOrTraceProcess(TableProcessStore store, TableProcess
process) {
+ if (!isExecutable(store)) {
LOG.info(
"Table process {} with identifier {} may have been in canceling or
canceled, cancel execute process.",
- process.getId(),
- process.getExternalProcessIdentifier());
+ store.getProcessId(),
+ store.getExternalProcessIdentifier());
return;
}
- ExecuteEngine executeEngine =
-
executeEngines.get(EngineType.of(process.store().getExecutionEngine()));
+ ExecuteEngine executeEngine =
executeEngines.get(store.getExecutionEngine());
+ if (executeEngine == null) {
+ LOG.error(
+ "Can't found execution engine:{} for process:{}, table:{}",
+ store.getExecutionEngine(),
+ store.getAction(),
+ process.getTableIdentifier());
+ return;
+ }
- TableProcessExecutor executor = new TableProcessExecutor(process,
executeEngine);
+ TableProcessExecutor executor = new TableProcessExecutor(process, store,
executeEngine);
executor.onProcessFinished(
() -> {
ActionCoordinatorScheduler scheduler =
- actionCoordinators.get(process.store().getAction().getName());
+ actionCoordinators.get(store.getAction().getName());
if (scheduler != null
- && process.getStatus() == ProcessStatus.FAILED
- && process.store().getRetryNumber() <
scheduler.PROCESS_MAX_RETRY_NUMBER
+ && store.getStatus() == ProcessStatus.FAILED
+ && store.getRetryNumber() <
ActionCoordinatorScheduler.PROCESS_MAX_RETRY_NUMBER
&& process.getTableRuntime() != null) {
- process
- .store()
- .tryTransitState(
- ProcessStatus.PENDING,
- ProcessEvent.RETRY_REQUESTED,
- process.getExternalProcessIdentifier(),
- "Regular Retry.",
- process.getProcessParameters(),
- process.getSummary());
- executeOrTraceProcess(process);
+ store.tryTransitState(
+ ProcessStatus.PENDING,
+ ProcessEvent.RETRY_REQUESTED,
+ store.getExternalProcessIdentifier(),
+ "Regular Retry.",
+ process.getProcessParameters(),
+ process.getSummary());
+ executeOrTraceProcess(store, process);
} else {
untrackTableProcessInstance(
- process.getTableRuntime().getTableIdentifier(),
process.getId());
+ process.getTableRuntime().getTableIdentifier(),
store.getProcessId());
}
});
@@ -249,7 +224,7 @@ public class ProcessService extends PersistentBase {
"Submit table process {} to engine {}, process id:{}",
process,
executeEngine.engineType(),
- process.getId());
+ store.getProcessId());
}
/**
@@ -257,29 +232,28 @@ public class ProcessService extends PersistentBase {
*
* @param process table process
*/
- private void cancelProcess(TableProcess process) {
-
- process
- .store()
- .tryTransitState(
- ProcessStatus.CANCELED,
- ProcessEvent.CANCEL_REQUESTED,
- process.getExternalProcessIdentifier(),
- "Gracefully Canceled.",
- process.getProcessParameters(),
- process.getSummary());
-
untrackTableProcessInstance(process.getTableRuntime().getTableIdentifier(),
process.getId());
-
- ExecuteEngine executeEngine =
-
executeEngines.get(EngineType.of(process.store().getExecutionEngine()));
-
- executeEngine.tryCancelTableProcess(process,
process.getExternalProcessIdentifier());
-
- LOG.info(
- "Cancel table process {} in engine {}, process id:{}",
- process,
- executeEngine.engineType(),
- process.getId());
+ private void cancelProcess(TableProcessStore store, TableProcess process) {
+
+ store.tryTransitState(
+ ProcessStatus.CANCELED,
+ ProcessEvent.CANCEL_REQUESTED,
+ store.getExternalProcessIdentifier(),
+ "Gracefully Canceled.",
+ process.getProcessParameters(),
+ process.getSummary());
+ untrackTableProcessInstance(
+ process.getTableRuntime().getTableIdentifier(), store.getProcessId());
+
+ ExecuteEngine executeEngine =
executeEngines.get(store.getExecutionEngine());
+
+ if (executeEngine != null) {
+ executeEngine.tryCancelTableProcess(process,
store.getExternalProcessIdentifier());
+ LOG.info(
+ "Cancel table process {} in engine {}, process id:{}",
+ process,
+ executeEngine.name(),
+ store.getProcessId());
+ }
}
/**
@@ -288,7 +262,7 @@ public class ProcessService extends PersistentBase {
* @param process table process
* @return true if executable
*/
- private boolean isExecutable(TableProcess process) {
+ private boolean isExecutable(TableProcessStore process) {
if (process.getStatus() == ProcessStatus.CANCELING
|| process.getStatus() == ProcessStatus.CANCELED) {
return false;
@@ -305,19 +279,24 @@ public class ProcessService extends PersistentBase {
* @return true if exists
*/
private boolean hasAliveTableProcess(TableRuntime tableRuntime, Action
action) {
- List<TableProcess> processes =
+ List<TableProcessHolder> processes =
getTableProcessInstances(tableRuntime.getTableIdentifier()).values().stream()
.filter(
tableProcess ->
-
tableProcess.getAction().getName().equalsIgnoreCase(action.getName()))
+ tableProcess
+ .getStore()
+ .getAction()
+ .getName()
+ .equalsIgnoreCase(action.getName()))
.collect(Collectors.toList());
+
return processes.stream()
.anyMatch(
process -> {
return (process != null
- && (process.getStatus() == ProcessStatus.RUNNING
- || process.getStatus() == ProcessStatus.SUBMITTED
- || process.getStatus() == ProcessStatus.PENDING));
+ && (process.getStore().getStatus() == ProcessStatus.RUNNING
+ || process.getStore().getStatus() ==
ProcessStatus.SUBMITTED
+ || process.getStore().getStatus() ==
ProcessStatus.PENDING));
});
}
@@ -327,8 +306,8 @@ public class ProcessService extends PersistentBase {
* @param process table process
* @return metadata snapshot
*/
- public TableProcessMeta persistTableProcess(TableProcess process) {
- TableProcessMeta processMeta =
TableProcessMeta.fromTableProcessStore(process.store());
+ protected DefaultTableProcessStore persistTableProcess(TableProcess process)
{
+ TableProcessMeta processMeta = TableProcessMeta.createProcessMeta(process);
doAs(
TableProcessMapper.class,
mapper ->
@@ -344,7 +323,12 @@ public class ProcessService extends PersistentBase {
processMeta.getCreateTime(),
processMeta.getProcessParameters(),
processMeta.getSummary()));
- return processMeta;
+ return new DefaultTableProcessStore(
+ processMeta.getProcessId(),
+ process.getTableRuntime(),
+ processMeta,
+ process.getAction(),
+ processMeta.getRetryNumber());
}
/**
@@ -363,7 +347,7 @@ public class ProcessService extends PersistentBase {
* @return engines map
*/
@VisibleForTesting
- public Map<EngineType, ExecuteEngine> getExecuteEngines() {
+ public Map<String, ExecuteEngine> getExecuteEngines() {
return executeEngines;
}
@@ -373,24 +357,10 @@ public class ProcessService extends PersistentBase {
* @return active process map
*/
@VisibleForTesting
- public Map<ServerTableIdentifier, Map<Long, TableProcess>>
getActiveTableProcess() {
+ public Map<ServerTableIdentifier, Map<Long, TableProcessHolder>>
getActiveTableProcess() {
return activeTableProcess;
}
- /**
- * Get a table process instance by table identifier and process id.
- *
- * @param serverTableIdentifier table identifier
- * @param processId process id
- * @return table process instance or null
- */
- @VisibleForTesting
- public TableProcess getTableProcessInstance(
- ServerTableIdentifier serverTableIdentifier, long processId) {
- Map<Long, TableProcess> inner =
activeTableProcess.get(serverTableIdentifier);
- return inner != null ? inner.get(processId) : null;
- }
-
/**
* Get all table process instances for the given table identifier.
*
@@ -398,9 +368,9 @@ public class ProcessService extends PersistentBase {
* @return unmodifiable map of processes
*/
@VisibleForTesting
- public Map<Long, TableProcess> getTableProcessInstances(
+ public Map<Long, TableProcessHolder> getTableProcessInstances(
ServerTableIdentifier serverTableIdentifier) {
- Map<Long, TableProcess> inner =
activeTableProcess.get(serverTableIdentifier);
+ Map<Long, TableProcessHolder> inner =
activeTableProcess.get(serverTableIdentifier);
if (inner == null || inner.isEmpty()) {
return Collections.emptyMap();
}
@@ -414,10 +384,12 @@ public class ProcessService extends PersistentBase {
* @param tableProcess process instance
*/
private void trackTableProcess(
- ServerTableIdentifier serverTableIdentifier, TableProcess tableProcess) {
+ ServerTableIdentifier serverTableIdentifier,
+ TableProcessStore store,
+ TableProcess tableProcess) {
activeTableProcess
.computeIfAbsent(serverTableIdentifier, key -> new
ConcurrentHashMap<>())
- .put(tableProcess.getId(), tableProcess);
+ .put(store.getProcessId(), new TableProcessHolder(tableProcess,
store));
}
/**
@@ -430,15 +402,15 @@ public class ProcessService extends PersistentBase {
@VisibleForTesting
public TableProcess untrackTableProcessInstance(
ServerTableIdentifier serverTableIdentifier, long processId) {
- Map<Long, TableProcess> inner =
activeTableProcess.get(serverTableIdentifier);
+ Map<Long, TableProcessHolder> inner =
activeTableProcess.get(serverTableIdentifier);
if (inner == null) {
return null;
}
- TableProcess removed = inner.remove(processId);
+ TableProcessHolder removed = inner.remove(processId);
if (inner.isEmpty()) {
activeTableProcess.remove(serverTableIdentifier, inner);
}
- return removed;
+ return removed != null ? removed.getProcess() : null;
}
@VisibleForTesting
@@ -455,7 +427,7 @@ public class ProcessService extends PersistentBase {
@VisibleForTesting
public void installExecuteEngine(ExecuteEngine executeEngine) {
- this.executeEngines.put(executeEngine.engineType(), executeEngine);
+ this.executeEngines.put(executeEngine.name(), executeEngine);
}
@VisibleForTesting
@@ -519,12 +491,12 @@ public class ProcessService extends PersistentBase {
@Override
protected void handleTableRemoved(TableRuntime tableRuntime) {
actionCoordinators.values().forEach(s ->
s.handleTableRemoved(tableRuntime));
- List<TableProcess> processes =
+ List<TableProcessHolder> processes =
getTableProcessInstances(tableRuntime.getTableIdentifier()).values().stream()
.collect(Collectors.toList());
- for (TableProcess process : processes) {
- if (process != null) {
- cancel(process);
+ for (TableProcessHolder holder : processes) {
+ if (holder != null) {
+ cancelProcess(holder.getStore(), holder.getProcess());
}
}
}
@@ -546,6 +518,25 @@ public class ProcessService extends PersistentBase {
}
}
+ @VisibleForTesting
+ public static class TableProcessHolder {
+ private final TableProcess process;
+ private final TableProcessStore store;
+
+ public TableProcessHolder(TableProcess process, TableProcessStore store) {
+ this.process = process;
+ this.store = store;
+ }
+
+ public TableProcess getProcess() {
+ return process;
+ }
+
+ public TableProcessStore getStore() {
+ return store;
+ }
+ }
+
/** Manager for {@link ExecuteEngine} plugins. */
public static class ExecuteEngineManager extends
AbstractPluginManager<ExecuteEngine> {
public ExecuteEngineManager() {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
index 6890c25c1..09f257b0f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
@@ -19,12 +19,15 @@
package org.apache.amoro.server.process;
import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.process.TableProcess;
import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.server.utils.SnowflakeIdGenerator;
import java.util.HashMap;
import java.util.Map;
public class TableProcessMeta {
+ private static final SnowflakeIdGenerator idGenerator = new
SnowflakeIdGenerator();
private long processId;
private long tableId;
private volatile String externalProcessIdentifier;
@@ -188,6 +191,24 @@ public class TableProcessMeta {
return tableProcessMeta;
}
+ public static TableProcessMeta createProcessMeta(TableProcess process) {
+ TableProcessMeta tableProcessMeta = new TableProcessMeta();
+ tableProcessMeta.setProcessId(idGenerator.generateId());
+ tableProcessMeta.setTableId(process.getTableIdentifier().getId());
+ tableProcessMeta.setExternalProcessIdentifier("");
+ tableProcessMeta.setStatus(ProcessStatus.PENDING);
+ tableProcessMeta.setProcessType(process.getAction().getName());
+ tableProcessMeta.setProcessStage(process.getProcessStage());
+ tableProcessMeta.setExecutionEngine(process.getExecutionEngine());
+ tableProcessMeta.setRetryNumber(0);
+ tableProcessMeta.setCreateTime(System.currentTimeMillis());
+ tableProcessMeta.setFinishTime(-1);
+ tableProcessMeta.setFailMessage("");
+ tableProcessMeta.setProcessParameters(process.getProcessParameters());
+ tableProcessMeta.setSummary(process.getSummary());
+ return tableProcessMeta;
+ }
+
public static TableProcessMeta of(
long processId,
long tableId,
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java
index 138ee1a3d..83cd66a3f 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java
@@ -18,9 +18,11 @@
package org.apache.amoro.server.process.executor;
+import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.ProcessEvent;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
import org.slf4j.Logger;
@@ -36,6 +38,7 @@ public class TableProcessExecutor extends PersistentBase
implements Runnable {
private static final long DEFAULT_POLL_INTERVAL_MS = 5000L;
public ExecuteEngine executeEngine;
protected TableProcess tableProcess;
+ private final TableProcessStore store;
private Runnable finishedCallback;
/**
@@ -44,9 +47,11 @@ public class TableProcessExecutor extends PersistentBase
implements Runnable {
* @param tableProcess table process
* @param executeEngine execute engine
*/
- public TableProcessExecutor(TableProcess tableProcess, ExecuteEngine
executeEngine) {
+ public TableProcessExecutor(
+ TableProcess tableProcess, TableProcessStore store, ExecuteEngine
executeEngine) {
this.tableProcess = tableProcess;
this.executeEngine = executeEngine;
+ this.store = store;
}
/** Submit or recover the process to engine, poll status and update store. */
@@ -56,46 +61,44 @@ public class TableProcessExecutor extends PersistentBase
implements Runnable {
ProcessStatus status;
String message = "";
- if (isTableProcessCanceling(tableProcess.getStatus())) {
+ if (isTableProcessCanceling(store.getStatus())) {
LOG.info(
"Table process {} with identifier {} may have been in canceling,
exit submit process.",
- tableProcess.getId(),
+ store.getProcessId(),
externalProcessIdentifier);
return;
}
try {
- if (tableProcess.getStatus() == ProcessStatus.UNKNOWN
- || tableProcess.getStatus() == ProcessStatus.PENDING
- ||
Strings.isNullOrEmpty(tableProcess.getExternalProcessIdentifier())) {
+ if (store.getStatus() == ProcessStatus.UNKNOWN
+ || store.getStatus() == ProcessStatus.PENDING
+ || Strings.isNullOrEmpty(store.getExternalProcessIdentifier())) {
externalProcessIdentifier =
executeEngine.submitTableProcess(tableProcess);
LOG.info(
"Submit table process {} to engine {} success, external process
identifier is {}",
- tableProcess.getId(),
+ store.getProcessId(),
executeEngine.engineType(),
externalProcessIdentifier);
} else {
- externalProcessIdentifier =
tableProcess.getExternalProcessIdentifier();
+ externalProcessIdentifier = store.getExternalProcessIdentifier();
}
validateIdentifier(externalProcessIdentifier);
status = executeEngine.getStatus(externalProcessIdentifier);
- tableProcess
- .store()
- .tryTransitState(
- status,
- ProcessEvent.SUBMIT_REQUESTED,
- externalProcessIdentifier,
- "Complete Submitted.",
- tableProcess.getProcessParameters(),
- tableProcess.getSummary());
+ store.tryTransitState(
+ status,
+ ProcessEvent.SUBMIT_REQUESTED,
+ externalProcessIdentifier,
+ "Complete Submitted.",
+ tableProcess.getProcessParameters(),
+ tableProcess.getSummary());
while (isTableProcessExecuting(status)) {
- if (isTableProcessCanceling(tableProcess.getStatus())) {
+ if (isTableProcessCanceling(store.getStatus())) {
LOG.info(
"Table process {} with identifier {} may have been in canceling,
exit submit process.",
- tableProcess.getId(),
+ store.getProcessId(),
externalProcessIdentifier);
return;
}
@@ -110,7 +113,7 @@ public class TableProcessExecutor extends PersistentBase
implements Runnable {
if (t instanceof InterruptedException) {
LOG.info(
"Table process {} with identifier {} may have been interrupted by
process service disposing, exit submit process.",
- tableProcess.getId(),
+ store.getProcessId(),
externalProcessIdentifier);
return;
} else {
@@ -121,57 +124,47 @@ public class TableProcessExecutor extends PersistentBase
implements Runnable {
LOG.info("The process {} is finished with status {}", tableProcess,
status);
if (status == ProcessStatus.KILLED) {
- tableProcess
- .store()
- .tryTransitState(
- status,
- ProcessEvent.KILL_REQUESTED,
- tableProcess.getExternalProcessIdentifier(),
- "Gracefully Killed.",
- tableProcess.getProcessParameters(),
- tableProcess.getSummary());
+ store.tryTransitState(
+ status,
+ ProcessEvent.KILL_REQUESTED,
+ store.getExternalProcessIdentifier(),
+ "Gracefully Killed.",
+ tableProcess.getProcessParameters(),
+ tableProcess.getSummary());
} else if (status == ProcessStatus.CANCELED) {
- tableProcess
- .store()
- .tryTransitState(
- status,
- ProcessEvent.CANCEL_REQUESTED,
- tableProcess.getExternalProcessIdentifier(),
- "Gracefully Cancelled.",
- tableProcess.getProcessParameters(),
- tableProcess.getSummary());
+ store.tryTransitState(
+ status,
+ ProcessEvent.CANCEL_REQUESTED,
+ store.getExternalProcessIdentifier(),
+ "Gracefully Cancelled.",
+ tableProcess.getProcessParameters(),
+ tableProcess.getSummary());
} else if (status == ProcessStatus.CLOSED) {
- tableProcess
- .store()
- .tryTransitState(
- status,
- ProcessEvent.KILL_REQUESTED,
- tableProcess.getExternalProcessIdentifier(),
- "Gracefully Closed.",
- tableProcess.getProcessParameters(),
- tableProcess.getSummary());
+ store.tryTransitState(
+ status,
+ ProcessEvent.KILL_REQUESTED,
+ store.getExternalProcessIdentifier(),
+ "Gracefully Closed.",
+ tableProcess.getProcessParameters(),
+ tableProcess.getSummary());
} else if (status == ProcessStatus.FAILED) {
- tableProcess
- .store()
- .tryTransitState(
- status,
- ProcessEvent.COMPLETE_FAILED,
- tableProcess.getExternalProcessIdentifier(),
- message,
- tableProcess.getProcessParameters(),
- tableProcess.getSummary());
+ store.tryTransitState(
+ status,
+ ProcessEvent.COMPLETE_FAILED,
+ store.getExternalProcessIdentifier(),
+ message,
+ tableProcess.getProcessParameters(),
+ tableProcess.getSummary());
} else if (status == ProcessStatus.SUCCESS) {
- tableProcess
- .store()
- .tryTransitState(
- status,
- ProcessEvent.COMPLETE_SUCCESS,
- tableProcess.getExternalProcessIdentifier(),
- "Complete Success",
- tableProcess.getProcessParameters(),
- tableProcess.getSummary());
+ store.tryTransitState(
+ status,
+ ProcessEvent.COMPLETE_SUCCESS,
+ store.getExternalProcessIdentifier(),
+ "Complete Success",
+ tableProcess.getProcessParameters(),
+ tableProcess.getSummary());
} else {
- LOG.warn("Un expected terminal status: {} for process: {}.", status,
tableProcess.getId());
+ LOG.warn("Un expected terminal status: {} for process: {}.", status,
store.getProcessId());
}
if (finishedCallback != null) {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
index ef7419325..1f19187e9 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
@@ -130,6 +130,8 @@ public abstract class PeriodicTableScheduler extends
RuntimeHandlerChain {
// so you need to perform the update operation separately for each
table.
persistUpdatingCleanupTime(tableRuntime);
}
+ } catch (Exception e) {
+ logger.error("exception when schedule for table: {}",
tableRuntime.getTableIdentifier(), e);
} finally {
scheduledTables.remove(tableRuntime.getTableIdentifier());
scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime));
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 002e7b0f2..ee23ab4d3 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -57,6 +57,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
/** Default table runtime implementation. */
public class DefaultTableRuntime extends AbstractTableRuntime {
@@ -93,13 +94,16 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
private volatile OptimizingProcess optimizingProcess;
private final List<TaskRuntime.TaskQuota> taskQuotas = new
CopyOnWriteArrayList<>();
- public DefaultTableRuntime(TableRuntimeStore store) {
+ private final Supplier<AmoroTable<?>> loader;
+
+ public DefaultTableRuntime(TableRuntimeStore store, Supplier<AmoroTable<?>>
loader) {
super(store);
this.optimizingMetrics =
new TableOptimizingMetrics(store.getTableIdentifier(),
store.getGroupName());
this.orphanFilesCleaningMetrics =
new TableOrphanFilesCleaningMetrics(store.getTableIdentifier());
this.tableSummaryMetrics = new
TableSummaryMetrics(store.getTableIdentifier());
+ this.loader = loader;
}
public void recover(OptimizingProcess optimizingProcess) {
@@ -469,6 +473,11 @@ public class DefaultTableRuntime extends
AbstractTableRuntime {
super.dispose();
}
+ @Override
+ public AmoroTable<?> loadTable() {
+ return loader.get();
+ }
+
/**
* Check if operation are blocked now.
*
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
index 83050799a..c155f8d02 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
@@ -19,6 +19,7 @@
package org.apache.amoro.server.table;
import org.apache.amoro.Action;
+import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
@@ -36,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
/**
* Default {@link TableRuntimeFactory} implementation used by AMS.
@@ -52,6 +54,8 @@ public class DefaultTableRuntimeFactory implements
TableRuntimeFactory {
/** Coordinators derived from all installed process factories. */
private final List<ActionCoordinator> supportedCoordinators =
Lists.newArrayList();
+ private Function<ServerTableIdentifier, AmoroTable<?>> loader;
+
@Override
public List<ActionCoordinator> supportedCoordinators() {
return Collections.unmodifiableList(supportedCoordinators);
@@ -88,6 +92,11 @@ public class DefaultTableRuntimeFactory implements
TableRuntimeFactory {
}
}
+ @Override
+ public void withTableLoader(Function<ServerTableIdentifier, AmoroTable<?>>
loader) {
+ this.loader = loader;
+ }
+
@Override
public Optional<TableRuntimeCreator> accept(
ServerTableIdentifier tableIdentifier, Map<String, String>
tableProperties) {
@@ -149,7 +158,7 @@ public class DefaultTableRuntimeFactory implements
TableRuntimeFactory {
@Override
public TableRuntime create(TableRuntimeStore store) {
- return new DefaultTableRuntime(store);
+ return new DefaultTableRuntime(store, () ->
loader.apply(store.getTableIdentifier()));
}
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index a247b1804..fa0ee873c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -99,6 +99,7 @@ public class DefaultTableService extends PersistentBase
implements TableService
configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis();
this.serverConfiguration = configuration;
this.tableRuntimeFactory = tableRuntimeFactory;
+ this.tableRuntimeFactory.withTableLoader(this::loadTable);
}
@Override
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
index fe76a2242..5b719aa73 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
@@ -25,9 +25,10 @@ import org.apache.amoro.TableTestHelper;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.process.MockActionCoordinator;
import org.apache.amoro.server.process.MockExecuteEngine;
+import org.apache.amoro.server.process.ProcessService;
import org.apache.amoro.server.table.AMSTableTestBase;
import org.junit.After;
import org.junit.Assert;
@@ -74,11 +75,11 @@ public class TestDefaultProcessService extends
AMSTableTestBase {
public void prepare() {
createDatabase();
- MockActionCoordinator mockActionCoordinator = new MockActionCoordinator();
- processServiceService().installActionCoordinator(mockActionCoordinator);
-
MockExecuteEngine mockExecuteEngine = new MockExecuteEngine();
processServiceService().installExecuteEngine(mockExecuteEngine);
+
+ MockActionCoordinator mockActionCoordinator = new
MockActionCoordinator(mockExecuteEngine);
+ processServiceService().installActionCoordinator(mockActionCoordinator);
}
/** Clear resources after tests. */
@@ -111,15 +112,15 @@ public class TestDefaultProcessService extends
AMSTableTestBase {
awaitActiveInstances(executeEngine);
// Get the current active TableProcess
- TableProcess tableProcess = getAnyActiveTableProcess();
+ TableProcessStore store = getAnyActiveTableProcess();
// Wait again for active instances to preserve the original semantics
awaitActiveInstances(executeEngine);
// Assert status and engine instance
- Assert.assertEquals(ProcessStatus.RUNNING, tableProcess.getStatus());
+ Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus());
Future<?> future =
-
executeEngine.getActiveInstances().get(tableProcess.getExternalProcessIdentifier());
+
executeEngine.getActiveInstances().get(store.getExternalProcessIdentifier());
Assert.assertNotNull(future);
Assert.assertFalse(future.isDone());
dropTable();
@@ -136,18 +137,19 @@ public class TestDefaultProcessService extends
AMSTableTestBase {
try {
awaitActiveInstances(executeEngine);
- TableProcess tableProcess = getAnyActiveTableProcess();
+ ProcessService.TableProcessHolder holder =
getAnyActiveTableProcessHolder();
+ ServerTableIdentifier tableIdentifier =
+ holder.getProcess().getTableRuntime().getTableIdentifier();
+ TableProcessStore store = holder.getStore();
dropTable();
// Wait until both active and canceling queues are empty
awaitEngineDrained(executeEngine);
Assert.assertTrue(
- processServiceService()
-
.getTableProcessInstances(tableProcess.getTableRuntime().getTableIdentifier())
- .isEmpty());
+
processServiceService().getTableProcessInstances(tableIdentifier).isEmpty());
Assert.assertTrue(executeEngine.getActiveInstances().isEmpty());
- Assert.assertEquals(ProcessStatus.CANCELED, tableProcess.getStatus());
+ Assert.assertEquals(ProcessStatus.CANCELED, store.getStatus());
} catch (Throwable t) {
if (!processServiceService().getActiveTableProcess().isEmpty()) {
throw new IllegalStateException(
@@ -172,20 +174,18 @@ public class TestDefaultProcessService extends
AMSTableTestBase {
awaitActiveInstances(executeEngine);
- TableProcess tableProcess = getAnyActiveTableProcess();
+ ProcessService.TableProcessHolder holder =
getAnyActiveTableProcessHolder();
+ TableProcessStore store = holder.getStore();
+ org.apache.amoro.TableRuntime tableRuntime =
holder.getProcess().getTableRuntime();
- awaitEngineStatus(
- executeEngine, tableProcess.getExternalProcessIdentifier(),
ProcessStatus.RUNNING);
-
- Assert.assertEquals(ProcessStatus.RUNNING, tableProcess.getStatus());
+ awaitEngineStatus(executeEngine, store.getExternalProcessIdentifier(),
ProcessStatus.RUNNING);
+ Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus());
processServiceService()
- .untrackTableProcessInstance(
- tableProcess.getTableRuntime().getTableIdentifier(),
tableProcess.getId());
+ .untrackTableProcessInstance(tableRuntime.getTableIdentifier(),
store.getProcessId());
processServiceService()
- .recoverProcesses(
- new
ArrayList<>(Collections.singletonList(tableProcess.getTableRuntime())));
+ .recoverProcesses(new
ArrayList<>(Collections.singletonList(tableRuntime)));
// Wait for the active table process to reappear
awaitCondition(
@@ -193,14 +193,13 @@ public class TestDefaultProcessService extends
AMSTableTestBase {
WAIT_TIMEOUT_MS,
POLL_INTERVAL_MS);
- tableProcess = getAnyActiveTableProcess();
-
- awaitEngineStatus(
- executeEngine, tableProcess.getExternalProcessIdentifier(),
ProcessStatus.RUNNING);
+ holder = getAnyActiveTableProcessHolder();
+ store = holder.getStore();
- Assert.assertEquals(ProcessStatus.RUNNING, tableProcess.getStatus());
+ awaitEngineStatus(executeEngine, store.getExternalProcessIdentifier(),
ProcessStatus.RUNNING);
+ Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus());
Future<?> future =
-
executeEngine.getActiveInstances().get(tableProcess.getExternalProcessIdentifier());
+
executeEngine.getActiveInstances().get(store.getExternalProcessIdentifier());
Assert.assertNotNull(future);
Assert.assertFalse(future.isDone());
@@ -260,24 +259,29 @@ public class TestDefaultProcessService extends
AMSTableTestBase {
POLL_INTERVAL_MS);
}
- /** Get any active TableProcess; throw a clear error if none exists. */
- private TableProcess getAnyActiveTableProcess() {
- Map<ServerTableIdentifier, Map<Long, TableProcess>> active =
+ /** Get any active table process holder; throw a clear error if none exists.
*/
+ private ProcessService.TableProcessHolder getAnyActiveTableProcessHolder() {
+ Map<ServerTableIdentifier, Map<Long, ProcessService.TableProcessHolder>>
active =
processServiceService().getActiveTableProcess();
if (active == null || active.isEmpty()) {
throw new IllegalStateException("No active table process present");
}
- Map<?, TableProcess> inner =
active.values().stream().findFirst().orElse(null);
+ Map<?, ProcessService.TableProcessHolder> inner =
+ active.values().stream().findFirst().orElse(null);
if (inner == null || inner.isEmpty()) {
throw new IllegalStateException("No active table process present");
}
- TableProcess tp = inner.values().stream().findFirst().orElse(null);
+ ProcessService.TableProcessHolder tp =
inner.values().stream().findFirst().orElse(null);
if (tp == null) {
throw new IllegalStateException("No active table process present");
}
return tp;
}
+ private TableProcessStore getAnyActiveTableProcess() {
+ return getAnyActiveTableProcessHolder().getStore();
+ }
+
/** Wait until the given externalProcessIdentifier reaches the specified
status. */
private void awaitEngineStatus(MockExecuteEngine engine, String externalId,
ProcessStatus status)
throws InterruptedException {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
index 9d0f89b6b..d610c1805 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
@@ -22,11 +22,10 @@ import org.apache.amoro.Action;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.process.TableProcessStore;
-import org.apache.amoro.server.utils.SnowflakeIdGenerator;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -34,7 +33,12 @@ import java.util.Optional;
public class MockActionCoordinator implements ActionCoordinator {
public static final int PROCESS_MAX_POOL_SIZE = 1000;
public static final Action DEFAULT_ACTION =
Action.register("default_action");
- public static final SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new
SnowflakeIdGenerator();
+
+ private final ExecuteEngine executeEngine;
+
+ public MockActionCoordinator(ExecuteEngine executeEngine) {
+ this.executeEngine = executeEngine;
+ }
/**
* Whether the format is supported.
@@ -89,18 +93,7 @@ public class MockActionCoordinator implements
ActionCoordinator {
*/
@Override
public Optional<TableProcess> trigger(TableRuntime tableRuntime) {
- TableProcessMeta tableProcessMeta =
- TableProcessMeta.of(
- SNOWFLAKE_ID_GENERATOR.generateId(),
- tableRuntime.getTableIdentifier().getId(),
- action().getName(),
- "default",
- new HashMap<>());
- TableProcessStore tableProcessStore =
- new DefaultTableProcessStore(
- tableProcessMeta.getProcessId(), tableRuntime, tableProcessMeta,
action(), 3);
- MockTableProcess mockTableProcess = new MockTableProcess(tableRuntime,
tableProcessStore);
- return Optional.of(mockTableProcess);
+ return Optional.of(new MockTableProcess(tableRuntime, executeEngine,
action()));
}
/**
@@ -113,7 +106,7 @@ public class MockActionCoordinator implements
ActionCoordinator {
@Override
public TableProcess recoverTableProcess(
TableRuntime tableRuntime, TableProcessStore processStore) {
- return new MockTableProcess(tableRuntime, processStore);
+ return new MockTableProcess(tableRuntime, executeEngine, action());
}
/** Open plugin. */
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockExecuteEngine.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockExecuteEngine.java
index 50fff80d9..ec28a31c6 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockExecuteEngine.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockExecuteEngine.java
@@ -18,11 +18,11 @@
package org.apache.amoro.server.process;
+import org.apache.amoro.process.EngineType;
+import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.process.executor.EngineType;
-import org.apache.amoro.server.process.executor.ExecuteEngine;
import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
@@ -48,6 +48,7 @@ public class MockExecuteEngine implements ExecuteEngine {
private final ThreadPoolExecutor executionPool =
new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<>());
+ public static final EngineType MOCK_TYPE = EngineType.of("mock");
private final Map<String, Future<?>> activeInstances = new
ConcurrentHashMap<>();
private final Map<String, Future<?>> cancelingInstances = new
ConcurrentHashMap<>();
@@ -55,7 +56,7 @@ public class MockExecuteEngine implements ExecuteEngine {
/** Engine type of this mock engine. */
@Override
public EngineType engineType() {
- return EngineType.of("default");
+ return MOCK_TYPE;
}
/**
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockTableProcess.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockTableProcess.java
index e19ca69c0..9d8712680 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockTableProcess.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockTableProcess.java
@@ -18,33 +18,52 @@
package org.apache.amoro.server.process;
+import org.apache.amoro.Action;
import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.TableProcess;
-import org.apache.amoro.process.TableProcessStore;
+
+import java.util.Collections;
+import java.util.Map;
/** Mock table process for tests. */
public class MockTableProcess extends TableProcess {
- /**
- * Construct with runtime only.
- *
- * @param tableRuntime table runtime
- */
- MockTableProcess(TableRuntime tableRuntime) {
- super(tableRuntime);
+ private final Action action;
+ private final Map<String, String> processParameters;
+ private final Map<String, String> summary;
+
+ MockTableProcess(TableRuntime tableRuntime, ExecuteEngine executeEngine,
Action action) {
+ this(tableRuntime, executeEngine, action, Collections.emptyMap(),
Collections.emptyMap());
}
- /**
- * Construct with runtime and store.
- *
- * @param tableRuntime table runtime
- * @param tableProcessStore process store
- */
- MockTableProcess(TableRuntime tableRuntime, TableProcessStore
tableProcessStore) {
- super(tableRuntime, tableProcessStore);
+ MockTableProcess(
+ TableRuntime tableRuntime,
+ ExecuteEngine executeEngine,
+ Action action,
+ Map<String, String> processParameters,
+ Map<String, String> summary) {
+ super(tableRuntime, executeEngine);
+ this.action = action;
+ this.processParameters =
+ processParameters == null
+ ? Collections.emptyMap()
+ : Collections.unmodifiableMap(processParameters);
+ this.summary = summary == null ? Collections.emptyMap() :
Collections.unmodifiableMap(summary);
+ }
+
+ @Override
+ public Action getAction() {
+ return action;
}
- /** Close mock process. */
@Override
- protected void closeInternal() {}
+ public Map<String, String> getProcessParameters() {
+ return processParameters;
+ }
+
+ @Override
+ public Map<String, String> getSummary() {
+ return summary;
+ }
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
index c401c88d8..de25eac21 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
@@ -100,7 +100,7 @@ public class TestPeriodicTableSchedulerCleanup extends
PersistentBase {
new DefaultTableRuntimeStore(
identifier, meta, DefaultTableRuntime.REQUIRED_STATES,
Collections.emptyList());
- return new DefaultTableRuntime(store);
+ return new DefaultTableRuntime(store, () -> null);
}
private void cleanUpTableRuntimeData(List<Long> tableIds) {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
index e75d9e16d..88910376e 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
@@ -93,7 +93,7 @@ public class TestTableRuntimeRefreshExecutor extends
AMSTableTestBase {
private OptimizingConfig testOptimizingConfig;
TestTableRuntime(TableRuntimeStore store, OptimizingConfig
optimizingConfig) {
- super(store);
+ super(store, () -> null);
this.testOptimizingConfig = optimizingConfig;
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManagerQuery.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManagerQuery.java
index d5d5ff741..5f97386be 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManagerQuery.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManagerQuery.java
@@ -318,7 +318,7 @@ public class TestTableManagerQuery extends AMSTableTestBase
{
DefaultTableRuntimeStore store =
new DefaultTableRuntimeStore(
identifier, meta, DefaultTableRuntime.REQUIRED_STATES,
Collections.emptyList());
- return new DefaultTableRuntime(store);
+ return new DefaultTableRuntime(store, () -> null);
}
}
}
diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
index ae2f610cc..1d96553fe 100644
--- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
+++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
@@ -59,6 +59,13 @@ public interface TableRuntime {
*/
ServerTableIdentifier getTableIdentifier();
+ /**
+ * Load the current table instance for this runtime.
+ *
+ * <p>This method is mainly intended for in-AMS processes.
+ */
+ AmoroTable<?> loadTable();
+
/**
* Get the table configuration. @Deprecated use {@link #getTableConfig()}
instead.
*
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
index 6ff5b6cfb..be95ce3c7 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
@@ -47,11 +47,14 @@ public interface AmoroProcess {
SimpleFuture getCompleteFuture();
/**
- * Get {@link TableProcessStore} of the process
+ * Get the {@link Action} of the process
*
- * @return the state of the process
+ * @return the action of the process
*/
- TableProcessStore store();
+ Action getAction();
+
+ /** Get execution engine name. */
+ String getExecutionEngine();
/**
* Get the string encoded process params of the process, this could be a
simple description or a
@@ -59,44 +62,12 @@ public interface AmoroProcess {
*
* @return the params of the process
*/
- default Map<String, String> getProcessParameters() {
- return store().getProcessParameters();
- }
+ Map<String, String> getProcessParameters();
/**
- * Get the string encoded summary of the process, this could be a simple
description or a POJO
- * encoded by JSON
+ * Get the string encoded summary of the process, called when process is
successed.
*
* @return the summary of the process
*/
- default Map<String, String> getSummary() {
- return store().getSummary();
- }
-
- /**
- * Get {@link ProcessStatus} of the process
- *
- * @return the status of the process
- */
- default ProcessStatus getStatus() {
- return store().getStatus();
- }
-
- /**
- * Get the id of the process
- *
- * @return the id of the process
- */
- default long getId() {
- return store().getProcessId();
- }
-
- /**
- * Get the {@link Action} of the process
- *
- * @return the action of the process
- */
- default Action getAction() {
- return store().getAction();
- }
+ Map<String, String> getSummary();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/EngineType.java
b/amoro-common/src/main/java/org/apache/amoro/process/EngineType.java
similarity index 97%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/process/executor/EngineType.java
rename to amoro-common/src/main/java/org/apache/amoro/process/EngineType.java
index f6d834aae..b37dda0a2 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/EngineType.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/EngineType.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.amoro.server.process.executor;
+package org.apache.amoro.process;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngine.java
b/amoro-common/src/main/java/org/apache/amoro/process/ExecuteEngine.java
similarity index 93%
rename from
amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngine.java
rename to amoro-common/src/main/java/org/apache/amoro/process/ExecuteEngine.java
index 1b77cf2e9..8723b70ef 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngine.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ExecuteEngine.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-package org.apache.amoro.server.process.executor;
+package org.apache.amoro.process;
import org.apache.amoro.ActivePlugin;
-import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.TableProcess;
public interface ExecuteEngine extends ActivePlugin {
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
index c2e5c838a..275550165 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
@@ -25,6 +25,7 @@ import org.apache.amoro.TableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.table.StateKey;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -49,6 +50,13 @@ public interface ProcessFactory extends ActivePlugin {
return ProcessTriggerStrategy.METADATA_TRIGGER;
}
+ /**
+ * All execute engine registered.
+ *
+ * @param allAvailableEngines - all execute engine registered.
+ */
+ default void availableExecuteEngines(Collection<ExecuteEngine>
allAvailableEngines) {}
+
/**
* Try trigger a process for the action.
*
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
index 4b578d7fc..f155ae102 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
@@ -18,6 +18,7 @@
package org.apache.amoro.process;
+import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,56 +28,41 @@ public abstract class TableProcess implements AmoroProcess {
public static final Logger LOG = LoggerFactory.getLogger(TableProcess.class);
- protected final TableRuntime tableRuntime;
- protected final TableProcessStore tableProcessStore;
- protected final int maxRetryTime;
private final SimpleFuture submitFuture = new SimpleFuture();
private final SimpleFuture completeFuture = new SimpleFuture();
- protected TableProcess(TableRuntime tableRuntime) {
- this(tableRuntime, null, 1);
- }
-
- protected TableProcess(TableRuntime tableRuntime, TableProcessStore
tableProcessStore) {
- this(tableRuntime, tableProcessStore, 1);
- }
+ protected final TableRuntime tableRuntime;
+ private final ExecuteEngine executeEngine;
- protected TableProcess(
- TableRuntime tableRuntime, TableProcessStore tableProcessStore, int
maxRetryTime) {
+ protected TableProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
this.tableRuntime = tableRuntime;
- this.tableProcessStore = tableProcessStore;
- this.maxRetryTime = maxRetryTime;
+ this.executeEngine = engine;
}
- public TableRuntime getTableRuntime() {
- return tableRuntime;
+ @Override
+ public SimpleFuture getSubmitFuture() {
+ return submitFuture.or(completeFuture);
}
- public String getExternalProcessIdentifier() {
- // TODO: Add a new field to process meta to store external process
identifier.(e.g. flink job id
- // or yarn app id)
- return tableProcessStore.getExternalProcessIdentifier();
+ @Override
+ public SimpleFuture getCompleteFuture() {
+ return completeFuture;
}
- @Override
- public TableProcessStore store() {
- return tableProcessStore;
+ public ServerTableIdentifier getTableIdentifier() {
+ return tableRuntime.getTableIdentifier();
}
@Override
- public ProcessStatus getStatus() {
- return tableProcessStore.getStatus();
+ public String getExecutionEngine() {
+ return executeEngine.name();
}
- protected abstract void closeInternal();
-
- @Override
- public SimpleFuture getSubmitFuture() {
- return submitFuture.or(completeFuture);
+ public TableRuntime getTableRuntime() {
+ return tableRuntime;
}
- @Override
- public SimpleFuture getCompleteFuture() {
- return completeFuture;
+ public String getProcessStage() {
+ return "default";
}
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
index 5799166c5..bb242b094 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
@@ -18,6 +18,7 @@
package org.apache.amoro.table;
+import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.process.ActionCoordinator;
@@ -26,6 +27,7 @@ import org.apache.amoro.process.ProcessFactory;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
/** Table runtime factory. */
public interface TableRuntimeFactory {
@@ -34,6 +36,8 @@ public interface TableRuntimeFactory {
void initialize(List<ProcessFactory> factories);
+ void withTableLoader(Function<ServerTableIdentifier, AmoroTable<?>> loader);
+
Optional<TableRuntimeCreator> accept(
ServerTableIdentifier tableIdentifier, Map<String, String>
tableProperties);