This is an automated email from the ASF dual-hosted git repository.

baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 55fb785c9 [Improvement]: Extract TableProcessStore from AmoroProcess 
interface. (#4116)
55fb785c9 is described below

commit 55fb785c94c4ff95d06b477bb2194a91c6171c25
Author: baiyangtx <[email protected]>
AuthorDate: Thu Mar 12 21:40:45 2026 +0800

    [Improvement]: Extract TableProcessStore from AmoroProcess interface. 
(#4116)
    
    * refactor(process): extract process framework changes
    
    * CI
    
    * Update 
amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
    
    Co-authored-by: Xu Bai <[email protected]>
    
    * Update 
amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
    
    Co-authored-by: Xu Bai <[email protected]>
    
    * Fix comments
    
    ---------
    
    Co-authored-by: zhangyongxiang.alpha <[email protected]>
    Co-authored-by: Aime <[email protected]>
    Co-authored-by: ZhouJinsong <[email protected]>
    Co-authored-by: Xu Bai <[email protected]>
---
 .../apache/amoro/server/AmoroServiceContainer.java |   3 +
 .../server/process/ActionCoordinatorScheduler.java |  10 +-
 .../server/process/DefaultTableProcessStore.java   |   1 +
 .../amoro/server/process/ProcessService.java       | 241 ++++++++++-----------
 .../amoro/server/process/TableProcessMeta.java     |  21 ++
 .../process/executor/TableProcessExecutor.java     | 125 +++++------
 .../server/scheduler/PeriodicTableScheduler.java   |   2 +
 .../amoro/server/table/DefaultTableRuntime.java    |  11 +-
 .../server/table/DefaultTableRuntimeFactory.java   |  11 +-
 .../amoro/server/table/DefaultTableService.java    |   1 +
 .../amoro/server/TestDefaultProcessService.java    |  68 +++---
 .../server/process/MockActionCoordinator.java      |  25 +--
 .../amoro/server/process/MockExecuteEngine.java    |   7 +-
 .../amoro/server/process/MockTableProcess.java     |  55 +++--
 .../inline/TestPeriodicTableSchedulerCleanup.java  |   2 +-
 .../inline/TestTableRuntimeRefreshExecutor.java    |   2 +-
 .../amoro/server/table/TestTableManagerQuery.java  |   2 +-
 .../main/java/org/apache/amoro/TableRuntime.java   |   7 +
 .../org/apache/amoro/process/AmoroProcess.java     |  47 +---
 .../java/org/apache/amoro/process}/EngineType.java |   2 +-
 .../org/apache/amoro/process}/ExecuteEngine.java   |   4 +-
 .../org/apache/amoro/process/ProcessFactory.java   |   8 +
 .../org/apache/amoro/process/TableProcess.java     |  52 ++---
 .../apache/amoro/table/TableRuntimeFactory.java    |   4 +
 24 files changed, 363 insertions(+), 348 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 19bf7be98..25de6a7af 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -236,6 +236,7 @@ public class AmoroServiceContainer {
   }
 
   public void startOptimizingService() throws Exception {
+
     // Load process factories and build action coordinators from default table 
runtime factory.
     TableProcessFactoryManager tableProcessFactoryManager = new 
TableProcessFactoryManager();
     tableProcessFactoryManager.initialize();
@@ -246,6 +247,8 @@ public class AmoroServiceContainer {
 
     List<ActionCoordinator> actionCoordinators = 
defaultRuntimeFactory.supportedCoordinators();
     ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
+    processFactories.forEach(
+        c -> 
c.availableExecuteEngines(executeEngineManager.installedPlugins()));
 
     tableService = new DefaultTableService(serviceConfig, catalogManager, 
defaultRuntimeFactory);
     processService = new ProcessService(tableService, actionCoordinators, 
executeEngineManager);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
index daf586941..4e9f79a35 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java
@@ -108,16 +108,10 @@ public class ActionCoordinatorScheduler extends 
PeriodicTableScheduler {
    * @param tableRuntime table runtime
    * @param processStore process store
    */
-  protected void recover(TableRuntime tableRuntime, TableProcessStore 
processStore) {
-    TableProcess process = coordinator.recoverTableProcess(tableRuntime, 
processStore);
-    processService.recover(tableRuntime, process);
+  protected TableProcess recover(TableRuntime tableRuntime, TableProcessStore 
processStore) {
+    return coordinator.recoverTableProcess(tableRuntime, processStore);
   }
 
-  /**
-   * Get executor delay from coordinator.
-   *
-   * @return delay in milliseconds
-   */
   @Override
   protected long getExecutorDelay() {
     return coordinator.getExecutorDelay();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java
index 8a3dc6211..a57d1db21 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/DefaultTableProcessStore.java
@@ -63,6 +63,7 @@ public class DefaultTableProcessStore extends PersistentBase 
implements TablePro
    * @param action action type
    */
   public DefaultTableProcessStore(TableRuntime tableRuntime, TableProcessMeta 
meta, Action action) {
+    this.processId = meta.getProcessId();
     this.meta = meta;
     this.tableRuntime = tableRuntime;
     this.action = action;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index 79881c61b..01944e8a2 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -25,15 +25,15 @@ import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ExecuteEngine;
 import org.apache.amoro.process.ProcessEvent;
 import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.manager.AbstractPluginManager;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
-import org.apache.amoro.server.process.executor.EngineType;
-import org.apache.amoro.server.process.executor.ExecuteEngine;
 import org.apache.amoro.server.process.executor.TableProcessExecutor;
 import org.apache.amoro.server.table.RuntimeHandlerChain;
 import org.apache.amoro.server.table.TableService;
@@ -61,7 +61,7 @@ public class ProcessService extends PersistentBase {
 
   private final Map<String, ActionCoordinatorScheduler> actionCoordinators =
       new ConcurrentHashMap<>();
-  private final Map<EngineType, ExecuteEngine> executeEngines = new 
ConcurrentHashMap<>();
+  private final Map<String, ExecuteEngine> executeEngines = new 
ConcurrentHashMap<>();
 
   private final ExecuteEngineManager executeEngineManager;
   private final List<ActionCoordinator> actionCoordinatorList;
@@ -69,7 +69,7 @@ public class ProcessService extends PersistentBase {
   private final ThreadPoolExecutor processExecutionPool =
       new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>());
 
-  private final Map<ServerTableIdentifier, Map<Long, TableProcess>> 
activeTableProcess =
+  private final Map<ServerTableIdentifier, Map<Long, TableProcessHolder>> 
activeTableProcess =
       new ConcurrentHashMap<>();
 
   public ProcessService(TableService tableService) {
@@ -108,40 +108,9 @@ public class ProcessService extends PersistentBase {
           process.getAction());
       return;
     }
-    persistTableProcess(process);
-    trackTableProcess(tableRuntime.getTableIdentifier(), process);
-    executeOrTraceProcess(process);
-  }
-
-  /**
-   * Recover a table process for a given table runtime.
-   *
-   * @param tableRuntime table runtime
-   * @param process table process to recover
-   */
-  public void recover(TableRuntime tableRuntime, TableProcess process) {
-    // TODO: init some status
-    trackTableProcess(tableRuntime.getTableIdentifier(), process);
-    executeOrTraceProcess(process);
-  }
-
-  /**
-   * Retry a failed table process.
-   *
-   * @param process process to retry
-   */
-  public void retry(TableProcess process) {
-    executeOrTraceProcess(process);
-  }
-
-  /**
-   * Cancel a table process and release related resources.
-   *
-   * @param process process to cancel
-   */
-  public void cancel(TableProcess process) {
-    // TODO: init some status
-    cancelProcess(process);
+    TableProcessStore store = persistTableProcess(process);
+    trackTableProcess(tableRuntime.getTableIdentifier(), store, process);
+    executeOrTraceProcess(store, process);
   }
 
   /** Dispose the service, shutdown engines and clear active processes. */
@@ -164,7 +133,7 @@ public class ProcessService extends PersistentBase {
         .installedPlugins()
         .forEach(
             executeEngine -> {
-              executeEngines.put(executeEngine.engineType(), executeEngine);
+              executeEngines.put(executeEngine.name(), executeEngine);
             });
     recoverProcesses(tableRuntimes);
     actionCoordinators.values().forEach(s -> s.initialize(tableRuntimes));
@@ -188,14 +157,16 @@ public class ProcessService extends PersistentBase {
           ActionCoordinatorScheduler scheduler =
               actionCoordinators.get(processMeta.getProcessType());
           if (tableRuntime != null && scheduler != null) {
-            scheduler.recover(
-                tableRuntime,
+            DefaultTableProcessStore store =
                 new DefaultTableProcessStore(
                     processMeta.getProcessId(),
                     tableRuntime,
                     processMeta,
                     scheduler.getAction(),
-                    scheduler.PROCESS_MAX_RETRY_NUMBER));
+                    processMeta.getRetryNumber());
+            TableProcess process = scheduler.recover(tableRuntime, store);
+            trackTableProcess(tableRuntime.getTableIdentifier(), store, 
process);
+            executeOrTraceProcess(store, process);
           }
         });
   }
@@ -205,41 +176,45 @@ public class ProcessService extends PersistentBase {
    *
    * @param process table process
    */
-  private void executeOrTraceProcess(TableProcess process) {
-
-    if (!isExecutable(process)) {
+  private void executeOrTraceProcess(TableProcessStore store, TableProcess 
process) {
+    if (!isExecutable(store)) {
       LOG.info(
           "Table process {} with identifier {} may have been in canceling or 
canceled, cancel execute process.",
-          process.getId(),
-          process.getExternalProcessIdentifier());
+          store.getProcessId(),
+          store.getExternalProcessIdentifier());
       return;
     }
 
-    ExecuteEngine executeEngine =
-        
executeEngines.get(EngineType.of(process.store().getExecutionEngine()));
+    ExecuteEngine executeEngine = 
executeEngines.get(store.getExecutionEngine());
+    if (executeEngine == null) {
+      LOG.error(
+          "Can't found execution engine:{} for process:{}, table:{}",
+          store.getExecutionEngine(),
+          store.getAction(),
+          process.getTableIdentifier());
+      return;
+    }
 
-    TableProcessExecutor executor = new TableProcessExecutor(process, 
executeEngine);
+    TableProcessExecutor executor = new TableProcessExecutor(process, store, 
executeEngine);
     executor.onProcessFinished(
         () -> {
           ActionCoordinatorScheduler scheduler =
-              actionCoordinators.get(process.store().getAction().getName());
+              actionCoordinators.get(store.getAction().getName());
           if (scheduler != null
-              && process.getStatus() == ProcessStatus.FAILED
-              && process.store().getRetryNumber() < 
scheduler.PROCESS_MAX_RETRY_NUMBER
+              && store.getStatus() == ProcessStatus.FAILED
+              && store.getRetryNumber() < 
ActionCoordinatorScheduler.PROCESS_MAX_RETRY_NUMBER
               && process.getTableRuntime() != null) {
-            process
-                .store()
-                .tryTransitState(
-                    ProcessStatus.PENDING,
-                    ProcessEvent.RETRY_REQUESTED,
-                    process.getExternalProcessIdentifier(),
-                    "Regular Retry.",
-                    process.getProcessParameters(),
-                    process.getSummary());
-            executeOrTraceProcess(process);
+            store.tryTransitState(
+                ProcessStatus.PENDING,
+                ProcessEvent.RETRY_REQUESTED,
+                store.getExternalProcessIdentifier(),
+                "Regular Retry.",
+                process.getProcessParameters(),
+                process.getSummary());
+            executeOrTraceProcess(store, process);
           } else {
             untrackTableProcessInstance(
-                process.getTableRuntime().getTableIdentifier(), 
process.getId());
+                process.getTableRuntime().getTableIdentifier(), 
store.getProcessId());
           }
         });
 
@@ -249,7 +224,7 @@ public class ProcessService extends PersistentBase {
         "Submit table process {} to engine {}, process id:{}",
         process,
         executeEngine.engineType(),
-        process.getId());
+        store.getProcessId());
   }
 
   /**
@@ -257,29 +232,28 @@ public class ProcessService extends PersistentBase {
    *
    * @param process table process
    */
-  private void cancelProcess(TableProcess process) {
-
-    process
-        .store()
-        .tryTransitState(
-            ProcessStatus.CANCELED,
-            ProcessEvent.CANCEL_REQUESTED,
-            process.getExternalProcessIdentifier(),
-            "Gracefully Canceled.",
-            process.getProcessParameters(),
-            process.getSummary());
-    
untrackTableProcessInstance(process.getTableRuntime().getTableIdentifier(), 
process.getId());
-
-    ExecuteEngine executeEngine =
-        
executeEngines.get(EngineType.of(process.store().getExecutionEngine()));
-
-    executeEngine.tryCancelTableProcess(process, 
process.getExternalProcessIdentifier());
-
-    LOG.info(
-        "Cancel table process {} in engine {}, process id:{}",
-        process,
-        executeEngine.engineType(),
-        process.getId());
+  private void cancelProcess(TableProcessStore store, TableProcess process) {
+
+    store.tryTransitState(
+        ProcessStatus.CANCELED,
+        ProcessEvent.CANCEL_REQUESTED,
+        store.getExternalProcessIdentifier(),
+        "Gracefully Canceled.",
+        process.getProcessParameters(),
+        process.getSummary());
+    untrackTableProcessInstance(
+        process.getTableRuntime().getTableIdentifier(), store.getProcessId());
+
+    ExecuteEngine executeEngine = 
executeEngines.get(store.getExecutionEngine());
+
+    if (executeEngine != null) {
+      executeEngine.tryCancelTableProcess(process, 
store.getExternalProcessIdentifier());
+      LOG.info(
+          "Cancel table process {} in engine {}, process id:{}",
+          process,
+          executeEngine.name(),
+          store.getProcessId());
+    }
   }
 
   /**
@@ -288,7 +262,7 @@ public class ProcessService extends PersistentBase {
    * @param process table process
    * @return true if executable
    */
-  private boolean isExecutable(TableProcess process) {
+  private boolean isExecutable(TableProcessStore process) {
     if (process.getStatus() == ProcessStatus.CANCELING
         || process.getStatus() == ProcessStatus.CANCELED) {
       return false;
@@ -305,19 +279,24 @@ public class ProcessService extends PersistentBase {
    * @return true if exists
    */
   private boolean hasAliveTableProcess(TableRuntime tableRuntime, Action 
action) {
-    List<TableProcess> processes =
+    List<TableProcessHolder> processes =
         
getTableProcessInstances(tableRuntime.getTableIdentifier()).values().stream()
             .filter(
                 tableProcess ->
-                    
tableProcess.getAction().getName().equalsIgnoreCase(action.getName()))
+                    tableProcess
+                        .getStore()
+                        .getAction()
+                        .getName()
+                        .equalsIgnoreCase(action.getName()))
             .collect(Collectors.toList());
+
     return processes.stream()
         .anyMatch(
             process -> {
               return (process != null
-                  && (process.getStatus() == ProcessStatus.RUNNING
-                      || process.getStatus() == ProcessStatus.SUBMITTED
-                      || process.getStatus() == ProcessStatus.PENDING));
+                  && (process.getStore().getStatus() == ProcessStatus.RUNNING
+                      || process.getStore().getStatus() == 
ProcessStatus.SUBMITTED
+                      || process.getStore().getStatus() == 
ProcessStatus.PENDING));
             });
   }
 
@@ -327,8 +306,8 @@ public class ProcessService extends PersistentBase {
    * @param process table process
    * @return metadata snapshot
    */
-  public TableProcessMeta persistTableProcess(TableProcess process) {
-    TableProcessMeta processMeta = 
TableProcessMeta.fromTableProcessStore(process.store());
+  protected DefaultTableProcessStore persistTableProcess(TableProcess process) 
{
+    TableProcessMeta processMeta = TableProcessMeta.createProcessMeta(process);
     doAs(
         TableProcessMapper.class,
         mapper ->
@@ -344,7 +323,12 @@ public class ProcessService extends PersistentBase {
                 processMeta.getCreateTime(),
                 processMeta.getProcessParameters(),
                 processMeta.getSummary()));
-    return processMeta;
+    return new DefaultTableProcessStore(
+        processMeta.getProcessId(),
+        process.getTableRuntime(),
+        processMeta,
+        process.getAction(),
+        processMeta.getRetryNumber());
   }
 
   /**
@@ -363,7 +347,7 @@ public class ProcessService extends PersistentBase {
    * @return engines map
    */
   @VisibleForTesting
-  public Map<EngineType, ExecuteEngine> getExecuteEngines() {
+  public Map<String, ExecuteEngine> getExecuteEngines() {
     return executeEngines;
   }
 
@@ -373,24 +357,10 @@ public class ProcessService extends PersistentBase {
    * @return active process map
    */
   @VisibleForTesting
-  public Map<ServerTableIdentifier, Map<Long, TableProcess>> 
getActiveTableProcess() {
+  public Map<ServerTableIdentifier, Map<Long, TableProcessHolder>> 
getActiveTableProcess() {
     return activeTableProcess;
   }
 
-  /**
-   * Get a table process instance by table identifier and process id.
-   *
-   * @param serverTableIdentifier table identifier
-   * @param processId process id
-   * @return table process instance or null
-   */
-  @VisibleForTesting
-  public TableProcess getTableProcessInstance(
-      ServerTableIdentifier serverTableIdentifier, long processId) {
-    Map<Long, TableProcess> inner = 
activeTableProcess.get(serverTableIdentifier);
-    return inner != null ? inner.get(processId) : null;
-  }
-
   /**
    * Get all table process instances for the given table identifier.
    *
@@ -398,9 +368,9 @@ public class ProcessService extends PersistentBase {
    * @return unmodifiable map of processes
    */
   @VisibleForTesting
-  public Map<Long, TableProcess> getTableProcessInstances(
+  public Map<Long, TableProcessHolder> getTableProcessInstances(
       ServerTableIdentifier serverTableIdentifier) {
-    Map<Long, TableProcess> inner = 
activeTableProcess.get(serverTableIdentifier);
+    Map<Long, TableProcessHolder> inner = 
activeTableProcess.get(serverTableIdentifier);
     if (inner == null || inner.isEmpty()) {
       return Collections.emptyMap();
     }
@@ -414,10 +384,12 @@ public class ProcessService extends PersistentBase {
    * @param tableProcess process instance
    */
   private void trackTableProcess(
-      ServerTableIdentifier serverTableIdentifier, TableProcess tableProcess) {
+      ServerTableIdentifier serverTableIdentifier,
+      TableProcessStore store,
+      TableProcess tableProcess) {
     activeTableProcess
         .computeIfAbsent(serverTableIdentifier, key -> new 
ConcurrentHashMap<>())
-        .put(tableProcess.getId(), tableProcess);
+        .put(store.getProcessId(), new TableProcessHolder(tableProcess, 
store));
   }
 
   /**
@@ -430,15 +402,15 @@ public class ProcessService extends PersistentBase {
   @VisibleForTesting
   public TableProcess untrackTableProcessInstance(
       ServerTableIdentifier serverTableIdentifier, long processId) {
-    Map<Long, TableProcess> inner = 
activeTableProcess.get(serverTableIdentifier);
+    Map<Long, TableProcessHolder> inner = 
activeTableProcess.get(serverTableIdentifier);
     if (inner == null) {
       return null;
     }
-    TableProcess removed = inner.remove(processId);
+    TableProcessHolder removed = inner.remove(processId);
     if (inner.isEmpty()) {
       activeTableProcess.remove(serverTableIdentifier, inner);
     }
-    return removed;
+    return removed != null ? removed.getProcess() : null;
   }
 
   @VisibleForTesting
@@ -455,7 +427,7 @@ public class ProcessService extends PersistentBase {
 
   @VisibleForTesting
   public void installExecuteEngine(ExecuteEngine executeEngine) {
-    this.executeEngines.put(executeEngine.engineType(), executeEngine);
+    this.executeEngines.put(executeEngine.name(), executeEngine);
   }
 
   @VisibleForTesting
@@ -519,12 +491,12 @@ public class ProcessService extends PersistentBase {
     @Override
     protected void handleTableRemoved(TableRuntime tableRuntime) {
       actionCoordinators.values().forEach(s -> 
s.handleTableRemoved(tableRuntime));
-      List<TableProcess> processes =
+      List<TableProcessHolder> processes =
           
getTableProcessInstances(tableRuntime.getTableIdentifier()).values().stream()
               .collect(Collectors.toList());
-      for (TableProcess process : processes) {
-        if (process != null) {
-          cancel(process);
+      for (TableProcessHolder holder : processes) {
+        if (holder != null) {
+          cancelProcess(holder.getStore(), holder.getProcess());
         }
       }
     }
@@ -546,6 +518,25 @@ public class ProcessService extends PersistentBase {
     }
   }
 
+  @VisibleForTesting
+  public static class TableProcessHolder {
+    private final TableProcess process;
+    private final TableProcessStore store;
+
+    public TableProcessHolder(TableProcess process, TableProcessStore store) {
+      this.process = process;
+      this.store = store;
+    }
+
+    public TableProcess getProcess() {
+      return process;
+    }
+
+    public TableProcessStore getStore() {
+      return store;
+    }
+  }
+
   /** Manager for {@link ExecuteEngine} plugins. */
   public static class ExecuteEngineManager extends 
AbstractPluginManager<ExecuteEngine> {
     public ExecuteEngineManager() {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
index 6890c25c1..09f257b0f 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessMeta.java
@@ -19,12 +19,15 @@
 package org.apache.amoro.server.process;
 
 import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.process.TableProcess;
 import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.server.utils.SnowflakeIdGenerator;
 
 import java.util.HashMap;
 import java.util.Map;
 
 public class TableProcessMeta {
+  private static final SnowflakeIdGenerator idGenerator = new 
SnowflakeIdGenerator();
   private long processId;
   private long tableId;
   private volatile String externalProcessIdentifier;
@@ -188,6 +191,24 @@ public class TableProcessMeta {
     return tableProcessMeta;
   }
 
+  public static TableProcessMeta createProcessMeta(TableProcess process) {
+    TableProcessMeta tableProcessMeta = new TableProcessMeta();
+    tableProcessMeta.setProcessId(idGenerator.generateId());
+    tableProcessMeta.setTableId(process.getTableIdentifier().getId());
+    tableProcessMeta.setExternalProcessIdentifier("");
+    tableProcessMeta.setStatus(ProcessStatus.PENDING);
+    tableProcessMeta.setProcessType(process.getAction().getName());
+    tableProcessMeta.setProcessStage(process.getProcessStage());
+    tableProcessMeta.setExecutionEngine(process.getExecutionEngine());
+    tableProcessMeta.setRetryNumber(0);
+    tableProcessMeta.setCreateTime(System.currentTimeMillis());
+    tableProcessMeta.setFinishTime(-1);
+    tableProcessMeta.setFailMessage("");
+    tableProcessMeta.setProcessParameters(process.getProcessParameters());
+    tableProcessMeta.setSummary(process.getSummary());
+    return tableProcessMeta;
+  }
+
   public static TableProcessMeta of(
       long processId,
       long tableId,
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java
index 138ee1a3d..83cd66a3f 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/TableProcessExecutor.java
@@ -18,9 +18,11 @@
 
 package org.apache.amoro.server.process.executor;
 
+import org.apache.amoro.process.ExecuteEngine;
 import org.apache.amoro.process.ProcessEvent;
 import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
 import org.slf4j.Logger;
@@ -36,6 +38,7 @@ public class TableProcessExecutor extends PersistentBase 
implements Runnable {
   private static final long DEFAULT_POLL_INTERVAL_MS = 5000L;
   public ExecuteEngine executeEngine;
   protected TableProcess tableProcess;
+  private final TableProcessStore store;
   private Runnable finishedCallback;
 
   /**
@@ -44,9 +47,11 @@ public class TableProcessExecutor extends PersistentBase 
implements Runnable {
    * @param tableProcess table process
    * @param executeEngine execute engine
    */
-  public TableProcessExecutor(TableProcess tableProcess, ExecuteEngine 
executeEngine) {
+  public TableProcessExecutor(
+      TableProcess tableProcess, TableProcessStore store, ExecuteEngine 
executeEngine) {
     this.tableProcess = tableProcess;
     this.executeEngine = executeEngine;
+    this.store = store;
   }
 
   /** Submit or recover the process to engine, poll status and update store. */
@@ -56,46 +61,44 @@ public class TableProcessExecutor extends PersistentBase 
implements Runnable {
     ProcessStatus status;
     String message = "";
 
-    if (isTableProcessCanceling(tableProcess.getStatus())) {
+    if (isTableProcessCanceling(store.getStatus())) {
       LOG.info(
           "Table process {} with identifier {} may have been in canceling, 
exit submit process.",
-          tableProcess.getId(),
+          store.getProcessId(),
           externalProcessIdentifier);
       return;
     }
 
     try {
-      if (tableProcess.getStatus() == ProcessStatus.UNKNOWN
-          || tableProcess.getStatus() == ProcessStatus.PENDING
-          || 
Strings.isNullOrEmpty(tableProcess.getExternalProcessIdentifier())) {
+      if (store.getStatus() == ProcessStatus.UNKNOWN
+          || store.getStatus() == ProcessStatus.PENDING
+          || Strings.isNullOrEmpty(store.getExternalProcessIdentifier())) {
         externalProcessIdentifier = 
executeEngine.submitTableProcess(tableProcess);
         LOG.info(
             "Submit table process {} to engine {} success, external process 
identifier is {}",
-            tableProcess.getId(),
+            store.getProcessId(),
             executeEngine.engineType(),
             externalProcessIdentifier);
       } else {
-        externalProcessIdentifier = 
tableProcess.getExternalProcessIdentifier();
+        externalProcessIdentifier = store.getExternalProcessIdentifier();
       }
 
       validateIdentifier(externalProcessIdentifier);
 
       status = executeEngine.getStatus(externalProcessIdentifier);
-      tableProcess
-          .store()
-          .tryTransitState(
-              status,
-              ProcessEvent.SUBMIT_REQUESTED,
-              externalProcessIdentifier,
-              "Complete Submitted.",
-              tableProcess.getProcessParameters(),
-              tableProcess.getSummary());
+      store.tryTransitState(
+          status,
+          ProcessEvent.SUBMIT_REQUESTED,
+          externalProcessIdentifier,
+          "Complete Submitted.",
+          tableProcess.getProcessParameters(),
+          tableProcess.getSummary());
 
       while (isTableProcessExecuting(status)) {
-        if (isTableProcessCanceling(tableProcess.getStatus())) {
+        if (isTableProcessCanceling(store.getStatus())) {
           LOG.info(
               "Table process {} with identifier {} may have been in canceling, 
exit submit process.",
-              tableProcess.getId(),
+              store.getProcessId(),
               externalProcessIdentifier);
           return;
         }
@@ -110,7 +113,7 @@ public class TableProcessExecutor extends PersistentBase 
implements Runnable {
       if (t instanceof InterruptedException) {
         LOG.info(
             "Table process {} with identifier {} may have been interrupted by 
process service disposing, exit submit process.",
-            tableProcess.getId(),
+            store.getProcessId(),
             externalProcessIdentifier);
         return;
       } else {
@@ -121,57 +124,47 @@ public class TableProcessExecutor extends PersistentBase 
implements Runnable {
 
     LOG.info("The process {} is finished with status {}", tableProcess, 
status);
     if (status == ProcessStatus.KILLED) {
-      tableProcess
-          .store()
-          .tryTransitState(
-              status,
-              ProcessEvent.KILL_REQUESTED,
-              tableProcess.getExternalProcessIdentifier(),
-              "Gracefully Killed.",
-              tableProcess.getProcessParameters(),
-              tableProcess.getSummary());
+      store.tryTransitState(
+          status,
+          ProcessEvent.KILL_REQUESTED,
+          store.getExternalProcessIdentifier(),
+          "Gracefully Killed.",
+          tableProcess.getProcessParameters(),
+          tableProcess.getSummary());
     } else if (status == ProcessStatus.CANCELED) {
-      tableProcess
-          .store()
-          .tryTransitState(
-              status,
-              ProcessEvent.CANCEL_REQUESTED,
-              tableProcess.getExternalProcessIdentifier(),
-              "Gracefully Cancelled.",
-              tableProcess.getProcessParameters(),
-              tableProcess.getSummary());
+      store.tryTransitState(
+          status,
+          ProcessEvent.CANCEL_REQUESTED,
+          store.getExternalProcessIdentifier(),
+          "Gracefully Cancelled.",
+          tableProcess.getProcessParameters(),
+          tableProcess.getSummary());
     } else if (status == ProcessStatus.CLOSED) {
-      tableProcess
-          .store()
-          .tryTransitState(
-              status,
-              ProcessEvent.KILL_REQUESTED,
-              tableProcess.getExternalProcessIdentifier(),
-              "Gracefully Closed.",
-              tableProcess.getProcessParameters(),
-              tableProcess.getSummary());
+      store.tryTransitState(
+          status,
+          ProcessEvent.KILL_REQUESTED,
+          store.getExternalProcessIdentifier(),
+          "Gracefully Closed.",
+          tableProcess.getProcessParameters(),
+          tableProcess.getSummary());
     } else if (status == ProcessStatus.FAILED) {
-      tableProcess
-          .store()
-          .tryTransitState(
-              status,
-              ProcessEvent.COMPLETE_FAILED,
-              tableProcess.getExternalProcessIdentifier(),
-              message,
-              tableProcess.getProcessParameters(),
-              tableProcess.getSummary());
+      store.tryTransitState(
+          status,
+          ProcessEvent.COMPLETE_FAILED,
+          store.getExternalProcessIdentifier(),
+          message,
+          tableProcess.getProcessParameters(),
+          tableProcess.getSummary());
     } else if (status == ProcessStatus.SUCCESS) {
-      tableProcess
-          .store()
-          .tryTransitState(
-              status,
-              ProcessEvent.COMPLETE_SUCCESS,
-              tableProcess.getExternalProcessIdentifier(),
-              "Complete Success",
-              tableProcess.getProcessParameters(),
-              tableProcess.getSummary());
+      store.tryTransitState(
+          status,
+          ProcessEvent.COMPLETE_SUCCESS,
+          store.getExternalProcessIdentifier(),
+          "Complete Success",
+          tableProcess.getProcessParameters(),
+          tableProcess.getSummary());
     } else {
-      LOG.warn("Un expected terminal status: {} for process: {}.", status, 
tableProcess.getId());
+      LOG.warn("Un expected terminal status: {} for process: {}.", status, 
store.getProcessId());
     }
 
     if (finishedCallback != null) {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
index ef7419325..1f19187e9 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java
@@ -130,6 +130,8 @@ public abstract class PeriodicTableScheduler extends 
RuntimeHandlerChain {
         // so you need to perform the update operation separately for each 
table.
         persistUpdatingCleanupTime(tableRuntime);
       }
+    } catch (Exception e) {
+      logger.error("exception when schedule for table: {}", 
tableRuntime.getTableIdentifier(), e);
     } finally {
       scheduledTables.remove(tableRuntime.getTableIdentifier());
       scheduleIfNecessary(tableRuntime, getNextExecutingTime(tableRuntime));
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index 002e7b0f2..ee23ab4d3 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -57,6 +57,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
 
 /** Default table runtime implementation. */
 public class DefaultTableRuntime extends AbstractTableRuntime {
@@ -93,13 +94,16 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime {
   private volatile OptimizingProcess optimizingProcess;
   private final List<TaskRuntime.TaskQuota> taskQuotas = new 
CopyOnWriteArrayList<>();
 
-  public DefaultTableRuntime(TableRuntimeStore store) {
+  private final Supplier<AmoroTable<?>> loader;
+
+  public DefaultTableRuntime(TableRuntimeStore store, Supplier<AmoroTable<?>> 
loader) {
     super(store);
     this.optimizingMetrics =
         new TableOptimizingMetrics(store.getTableIdentifier(), 
store.getGroupName());
     this.orphanFilesCleaningMetrics =
         new TableOrphanFilesCleaningMetrics(store.getTableIdentifier());
     this.tableSummaryMetrics = new 
TableSummaryMetrics(store.getTableIdentifier());
+    this.loader = loader;
   }
 
   public void recover(OptimizingProcess optimizingProcess) {
@@ -469,6 +473,11 @@ public class DefaultTableRuntime extends 
AbstractTableRuntime {
     super.dispose();
   }
 
+  @Override
+  public AmoroTable<?> loadTable() {
+    return loader.get();
+  }
+
   /**
    * Check if operation are blocked now.
    *
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
index 83050799a..c155f8d02 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java
@@ -19,6 +19,7 @@
 package org.apache.amoro.server.table;
 
 import org.apache.amoro.Action;
+import org.apache.amoro.AmoroTable;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
@@ -36,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 
 /**
  * Default {@link TableRuntimeFactory} implementation used by AMS.
@@ -52,6 +54,8 @@ public class DefaultTableRuntimeFactory implements 
TableRuntimeFactory {
   /** Coordinators derived from all installed process factories. */
   private final List<ActionCoordinator> supportedCoordinators = 
Lists.newArrayList();
 
+  private Function<ServerTableIdentifier, AmoroTable<?>> loader;
+
   @Override
   public List<ActionCoordinator> supportedCoordinators() {
     return Collections.unmodifiableList(supportedCoordinators);
@@ -88,6 +92,11 @@ public class DefaultTableRuntimeFactory implements 
TableRuntimeFactory {
     }
   }
 
+  @Override
+  public void withTableLoader(Function<ServerTableIdentifier, AmoroTable<?>> 
loader) {
+    this.loader = loader;
+  }
+
   @Override
   public Optional<TableRuntimeCreator> accept(
       ServerTableIdentifier tableIdentifier, Map<String, String> 
tableProperties) {
@@ -149,7 +158,7 @@ public class DefaultTableRuntimeFactory implements 
TableRuntimeFactory {
 
     @Override
     public TableRuntime create(TableRuntimeStore store) {
-      return new DefaultTableRuntime(store);
+      return new DefaultTableRuntime(store, () -> 
loader.apply(store.getTableIdentifier()));
     }
   }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index a247b1804..fa0ee873c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -99,6 +99,7 @@ public class DefaultTableService extends PersistentBase 
implements TableService
         
configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis();
     this.serverConfiguration = configuration;
     this.tableRuntimeFactory = tableRuntimeFactory;
+    this.tableRuntimeFactory.withTableLoader(this::loadTable);
   }
 
   @Override
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
index fe76a2242..5b719aa73 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultProcessService.java
@@ -25,9 +25,10 @@ import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
 import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.process.MockActionCoordinator;
 import org.apache.amoro.server.process.MockExecuteEngine;
+import org.apache.amoro.server.process.ProcessService;
 import org.apache.amoro.server.table.AMSTableTestBase;
 import org.junit.After;
 import org.junit.Assert;
@@ -74,11 +75,11 @@ public class TestDefaultProcessService extends 
AMSTableTestBase {
   public void prepare() {
     createDatabase();
 
-    MockActionCoordinator mockActionCoordinator = new MockActionCoordinator();
-    processServiceService().installActionCoordinator(mockActionCoordinator);
-
     MockExecuteEngine mockExecuteEngine = new MockExecuteEngine();
     processServiceService().installExecuteEngine(mockExecuteEngine);
+
+    MockActionCoordinator mockActionCoordinator = new 
MockActionCoordinator(mockExecuteEngine);
+    processServiceService().installActionCoordinator(mockActionCoordinator);
   }
 
   /** Clear resources after tests. */
@@ -111,15 +112,15 @@ public class TestDefaultProcessService extends 
AMSTableTestBase {
       awaitActiveInstances(executeEngine);
 
       // Get the current active TableProcess
-      TableProcess tableProcess = getAnyActiveTableProcess();
+      TableProcessStore store = getAnyActiveTableProcess();
 
       // Wait again for active instances to preserve the original semantics
       awaitActiveInstances(executeEngine);
 
       // Assert status and engine instance
-      Assert.assertEquals(ProcessStatus.RUNNING, tableProcess.getStatus());
+      Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus());
       Future<?> future =
-          
executeEngine.getActiveInstances().get(tableProcess.getExternalProcessIdentifier());
+          
executeEngine.getActiveInstances().get(store.getExternalProcessIdentifier());
       Assert.assertNotNull(future);
       Assert.assertFalse(future.isDone());
       dropTable();
@@ -136,18 +137,19 @@ public class TestDefaultProcessService extends 
AMSTableTestBase {
     try {
       awaitActiveInstances(executeEngine);
 
-      TableProcess tableProcess = getAnyActiveTableProcess();
+      ProcessService.TableProcessHolder holder = 
getAnyActiveTableProcessHolder();
+      ServerTableIdentifier tableIdentifier =
+          holder.getProcess().getTableRuntime().getTableIdentifier();
+      TableProcessStore store = holder.getStore();
       dropTable();
 
       // Wait until both active and canceling queues are empty
       awaitEngineDrained(executeEngine);
 
       Assert.assertTrue(
-          processServiceService()
-              
.getTableProcessInstances(tableProcess.getTableRuntime().getTableIdentifier())
-              .isEmpty());
+          
processServiceService().getTableProcessInstances(tableIdentifier).isEmpty());
       Assert.assertTrue(executeEngine.getActiveInstances().isEmpty());
-      Assert.assertEquals(ProcessStatus.CANCELED, tableProcess.getStatus());
+      Assert.assertEquals(ProcessStatus.CANCELED, store.getStatus());
     } catch (Throwable t) {
       if (!processServiceService().getActiveTableProcess().isEmpty()) {
         throw new IllegalStateException(
@@ -172,20 +174,18 @@ public class TestDefaultProcessService extends 
AMSTableTestBase {
 
       awaitActiveInstances(executeEngine);
 
-      TableProcess tableProcess = getAnyActiveTableProcess();
+      ProcessService.TableProcessHolder holder = 
getAnyActiveTableProcessHolder();
+      TableProcessStore store = holder.getStore();
+      org.apache.amoro.TableRuntime tableRuntime = 
holder.getProcess().getTableRuntime();
 
-      awaitEngineStatus(
-          executeEngine, tableProcess.getExternalProcessIdentifier(), 
ProcessStatus.RUNNING);
-
-      Assert.assertEquals(ProcessStatus.RUNNING, tableProcess.getStatus());
+      awaitEngineStatus(executeEngine, store.getExternalProcessIdentifier(), 
ProcessStatus.RUNNING);
+      Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus());
 
       processServiceService()
-          .untrackTableProcessInstance(
-              tableProcess.getTableRuntime().getTableIdentifier(), 
tableProcess.getId());
+          .untrackTableProcessInstance(tableRuntime.getTableIdentifier(), 
store.getProcessId());
 
       processServiceService()
-          .recoverProcesses(
-              new 
ArrayList<>(Collections.singletonList(tableProcess.getTableRuntime())));
+          .recoverProcesses(new 
ArrayList<>(Collections.singletonList(tableRuntime)));
 
       // Wait for the active table process to reappear
       awaitCondition(
@@ -193,14 +193,13 @@ public class TestDefaultProcessService extends 
AMSTableTestBase {
           WAIT_TIMEOUT_MS,
           POLL_INTERVAL_MS);
 
-      tableProcess = getAnyActiveTableProcess();
-
-      awaitEngineStatus(
-          executeEngine, tableProcess.getExternalProcessIdentifier(), 
ProcessStatus.RUNNING);
+      holder = getAnyActiveTableProcessHolder();
+      store = holder.getStore();
 
-      Assert.assertEquals(ProcessStatus.RUNNING, tableProcess.getStatus());
+      awaitEngineStatus(executeEngine, store.getExternalProcessIdentifier(), 
ProcessStatus.RUNNING);
+      Assert.assertEquals(ProcessStatus.RUNNING, store.getStatus());
       Future<?> future =
-          
executeEngine.getActiveInstances().get(tableProcess.getExternalProcessIdentifier());
+          
executeEngine.getActiveInstances().get(store.getExternalProcessIdentifier());
       Assert.assertNotNull(future);
       Assert.assertFalse(future.isDone());
 
@@ -260,24 +259,29 @@ public class TestDefaultProcessService extends 
AMSTableTestBase {
         POLL_INTERVAL_MS);
   }
 
-  /** Get any active TableProcess; throw a clear error if none exists. */
-  private TableProcess getAnyActiveTableProcess() {
-    Map<ServerTableIdentifier, Map<Long, TableProcess>> active =
+  /** Get any active table process holder; throw a clear error if none exists. 
*/
+  private ProcessService.TableProcessHolder getAnyActiveTableProcessHolder() {
+    Map<ServerTableIdentifier, Map<Long, ProcessService.TableProcessHolder>> 
active =
         processServiceService().getActiveTableProcess();
     if (active == null || active.isEmpty()) {
       throw new IllegalStateException("No active table process present");
     }
-    Map<?, TableProcess> inner = 
active.values().stream().findFirst().orElse(null);
+    Map<?, ProcessService.TableProcessHolder> inner =
+        active.values().stream().findFirst().orElse(null);
     if (inner == null || inner.isEmpty()) {
       throw new IllegalStateException("No active table process present");
     }
-    TableProcess tp = inner.values().stream().findFirst().orElse(null);
+    ProcessService.TableProcessHolder tp = 
inner.values().stream().findFirst().orElse(null);
     if (tp == null) {
       throw new IllegalStateException("No active table process present");
     }
     return tp;
   }
 
+  private TableProcessStore getAnyActiveTableProcess() {
+    return getAnyActiveTableProcessHolder().getStore();
+  }
+
   /** Wait until the given externalProcessIdentifier reaches the specified 
status. */
   private void awaitEngineStatus(MockExecuteEngine engine, String externalId, 
ProcessStatus status)
       throws InterruptedException {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
index 9d0f89b6b..d610c1805 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockActionCoordinator.java
@@ -22,11 +22,10 @@ import org.apache.amoro.Action;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ExecuteEngine;
 import org.apache.amoro.process.TableProcess;
 import org.apache.amoro.process.TableProcessStore;
-import org.apache.amoro.server.utils.SnowflakeIdGenerator;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
@@ -34,7 +33,12 @@ import java.util.Optional;
 public class MockActionCoordinator implements ActionCoordinator {
   public static final int PROCESS_MAX_POOL_SIZE = 1000;
   public static final Action DEFAULT_ACTION = 
Action.register("default_action");
-  public static final SnowflakeIdGenerator SNOWFLAKE_ID_GENERATOR = new 
SnowflakeIdGenerator();
+
+  private final ExecuteEngine executeEngine;
+
+  public MockActionCoordinator(ExecuteEngine executeEngine) {
+    this.executeEngine = executeEngine;
+  }
 
   /**
    * Whether the format is supported.
@@ -89,18 +93,7 @@ public class MockActionCoordinator implements 
ActionCoordinator {
    */
   @Override
   public Optional<TableProcess> trigger(TableRuntime tableRuntime) {
-    TableProcessMeta tableProcessMeta =
-        TableProcessMeta.of(
-            SNOWFLAKE_ID_GENERATOR.generateId(),
-            tableRuntime.getTableIdentifier().getId(),
-            action().getName(),
-            "default",
-            new HashMap<>());
-    TableProcessStore tableProcessStore =
-        new DefaultTableProcessStore(
-            tableProcessMeta.getProcessId(), tableRuntime, tableProcessMeta, 
action(), 3);
-    MockTableProcess mockTableProcess = new MockTableProcess(tableRuntime, 
tableProcessStore);
-    return Optional.of(mockTableProcess);
+    return Optional.of(new MockTableProcess(tableRuntime, executeEngine, 
action()));
   }
 
   /**
@@ -113,7 +106,7 @@ public class MockActionCoordinator implements 
ActionCoordinator {
   @Override
   public TableProcess recoverTableProcess(
       TableRuntime tableRuntime, TableProcessStore processStore) {
-    return new MockTableProcess(tableRuntime, processStore);
+    return new MockTableProcess(tableRuntime, executeEngine, action());
   }
 
   /** Open plugin. */
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockExecuteEngine.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockExecuteEngine.java
index 50fff80d9..ec28a31c6 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockExecuteEngine.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockExecuteEngine.java
@@ -18,11 +18,11 @@
 
 package org.apache.amoro.server.process;
 
+import org.apache.amoro.process.EngineType;
+import org.apache.amoro.process.ExecuteEngine;
 import org.apache.amoro.process.ProcessStatus;
 import org.apache.amoro.process.TableProcess;
 import org.apache.amoro.server.persistence.PersistentBase;
-import org.apache.amoro.server.process.executor.EngineType;
-import org.apache.amoro.server.process.executor.ExecuteEngine;
 import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.parquet.Strings;
 import org.slf4j.Logger;
@@ -48,6 +48,7 @@ public class MockExecuteEngine implements ExecuteEngine {
   private final ThreadPoolExecutor executionPool =
       new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>());
 
+  public static final EngineType MOCK_TYPE = EngineType.of("mock");
   private final Map<String, Future<?>> activeInstances = new 
ConcurrentHashMap<>();
 
   private final Map<String, Future<?>> cancelingInstances = new 
ConcurrentHashMap<>();
@@ -55,7 +56,7 @@ public class MockExecuteEngine implements ExecuteEngine {
   /** Engine type of this mock engine. */
   @Override
   public EngineType engineType() {
-    return EngineType.of("default");
+    return MOCK_TYPE;
   }
 
   /**
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockTableProcess.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockTableProcess.java
index e19ca69c0..9d8712680 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/MockTableProcess.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/MockTableProcess.java
@@ -18,33 +18,52 @@
 
 package org.apache.amoro.server.process;
 
+import org.apache.amoro.Action;
 import org.apache.amoro.TableRuntime;
+import org.apache.amoro.process.ExecuteEngine;
 import org.apache.amoro.process.TableProcess;
-import org.apache.amoro.process.TableProcessStore;
+
+import java.util.Collections;
+import java.util.Map;
 
 /** Mock table process for tests. */
 public class MockTableProcess extends TableProcess {
 
-  /**
-   * Construct with runtime only.
-   *
-   * @param tableRuntime table runtime
-   */
-  MockTableProcess(TableRuntime tableRuntime) {
-    super(tableRuntime);
+  private final Action action;
+  private final Map<String, String> processParameters;
+  private final Map<String, String> summary;
+
+  MockTableProcess(TableRuntime tableRuntime, ExecuteEngine executeEngine, 
Action action) {
+    this(tableRuntime, executeEngine, action, Collections.emptyMap(), 
Collections.emptyMap());
   }
 
-  /**
-   * Construct with runtime and store.
-   *
-   * @param tableRuntime table runtime
-   * @param tableProcessStore process store
-   */
-  MockTableProcess(TableRuntime tableRuntime, TableProcessStore 
tableProcessStore) {
-    super(tableRuntime, tableProcessStore);
+  MockTableProcess(
+      TableRuntime tableRuntime,
+      ExecuteEngine executeEngine,
+      Action action,
+      Map<String, String> processParameters,
+      Map<String, String> summary) {
+    super(tableRuntime, executeEngine);
+    this.action = action;
+    this.processParameters =
+        processParameters == null
+            ? Collections.emptyMap()
+            : Collections.unmodifiableMap(processParameters);
+    this.summary = summary == null ? Collections.emptyMap() : 
Collections.unmodifiableMap(summary);
+  }
+
+  @Override
+  public Action getAction() {
+    return action;
   }
 
-  /** Close mock process. */
   @Override
-  protected void closeInternal() {}
+  public Map<String, String> getProcessParameters() {
+    return processParameters;
+  }
+
+  @Override
+  public Map<String, String> getSummary() {
+    return summary;
+  }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
index c401c88d8..de25eac21 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java
@@ -100,7 +100,7 @@ public class TestPeriodicTableSchedulerCleanup extends 
PersistentBase {
         new DefaultTableRuntimeStore(
             identifier, meta, DefaultTableRuntime.REQUIRED_STATES, 
Collections.emptyList());
 
-    return new DefaultTableRuntime(store);
+    return new DefaultTableRuntime(store, () -> null);
   }
 
   private void cleanUpTableRuntimeData(List<Long> tableIds) {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
index e75d9e16d..88910376e 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestTableRuntimeRefreshExecutor.java
@@ -93,7 +93,7 @@ public class TestTableRuntimeRefreshExecutor extends 
AMSTableTestBase {
     private OptimizingConfig testOptimizingConfig;
 
     TestTableRuntime(TableRuntimeStore store, OptimizingConfig 
optimizingConfig) {
-      super(store);
+      super(store, () -> null);
       this.testOptimizingConfig = optimizingConfig;
     }
 
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManagerQuery.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManagerQuery.java
index d5d5ff741..5f97386be 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManagerQuery.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableManagerQuery.java
@@ -318,7 +318,7 @@ public class TestTableManagerQuery extends AMSTableTestBase 
{
       DefaultTableRuntimeStore store =
           new DefaultTableRuntimeStore(
               identifier, meta, DefaultTableRuntime.REQUIRED_STATES, 
Collections.emptyList());
-      return new DefaultTableRuntime(store);
+      return new DefaultTableRuntime(store, () -> null);
     }
   }
 }
diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java 
b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
index ae2f610cc..1d96553fe 100644
--- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
+++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
@@ -59,6 +59,13 @@ public interface TableRuntime {
    */
   ServerTableIdentifier getTableIdentifier();
 
+  /**
+   * Load the current table instance for this runtime.
+   *
+   * <p>This method is mainly intended for in-AMS processes.
+   */
+  AmoroTable<?> loadTable();
+
   /**
    * Get the table configuration. @Deprecated use {@link #getTableConfig()} 
instead.
    *
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java 
b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
index 6ff5b6cfb..be95ce3c7 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/AmoroProcess.java
@@ -47,11 +47,14 @@ public interface AmoroProcess {
   SimpleFuture getCompleteFuture();
 
   /**
-   * Get {@link TableProcessStore} of the process
+   * Get the {@link Action} of the process
    *
-   * @return the state of the process
+   * @return the action of the process
    */
-  TableProcessStore store();
+  Action getAction();
+
+  /** Get execution engine name. */
+  String getExecutionEngine();
 
   /**
    * Get the string encoded process params of the process, this could be a 
simple description or a
@@ -59,44 +62,12 @@ public interface AmoroProcess {
    *
    * @return the params of the process
    */
-  default Map<String, String> getProcessParameters() {
-    return store().getProcessParameters();
-  }
+  Map<String, String> getProcessParameters();
 
   /**
-   * Get the string encoded summary of the process, this could be a simple 
description or a POJO
-   * encoded by JSON
+   * Get the string encoded summary of the process, called when process is 
successed.
    *
    * @return the summary of the process
    */
-  default Map<String, String> getSummary() {
-    return store().getSummary();
-  }
-
-  /**
-   * Get {@link ProcessStatus} of the process
-   *
-   * @return the status of the process
-   */
-  default ProcessStatus getStatus() {
-    return store().getStatus();
-  }
-
-  /**
-   * Get the id of the process
-   *
-   * @return the id of the process
-   */
-  default long getId() {
-    return store().getProcessId();
-  }
-
-  /**
-   * Get the {@link Action} of the process
-   *
-   * @return the action of the process
-   */
-  default Action getAction() {
-    return store().getAction();
-  }
+  Map<String, String> getSummary();
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/EngineType.java
 b/amoro-common/src/main/java/org/apache/amoro/process/EngineType.java
similarity index 97%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/process/executor/EngineType.java
rename to amoro-common/src/main/java/org/apache/amoro/process/EngineType.java
index f6d834aae..b37dda0a2 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/EngineType.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/EngineType.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.process.executor;
+package org.apache.amoro.process;
 
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngine.java
 b/amoro-common/src/main/java/org/apache/amoro/process/ExecuteEngine.java
similarity index 93%
rename from 
amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngine.java
rename to amoro-common/src/main/java/org/apache/amoro/process/ExecuteEngine.java
index 1b77cf2e9..8723b70ef 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/ExecuteEngine.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ExecuteEngine.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.amoro.server.process.executor;
+package org.apache.amoro.process;
 
 import org.apache.amoro.ActivePlugin;
-import org.apache.amoro.process.ProcessStatus;
-import org.apache.amoro.process.TableProcess;
 
 public interface ExecuteEngine extends ActivePlugin {
 
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java 
b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
index c2e5c838a..275550165 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/ProcessFactory.java
@@ -25,6 +25,7 @@ import org.apache.amoro.TableRuntime;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.table.StateKey;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -49,6 +50,13 @@ public interface ProcessFactory extends ActivePlugin {
     return ProcessTriggerStrategy.METADATA_TRIGGER;
   }
 
+  /**
+   * All execute engine registered.
+   *
+   * @param allAvailableEngines - all execute engine registered.
+   */
+  default void availableExecuteEngines(Collection<ExecuteEngine> 
allAvailableEngines) {}
+
   /**
    * Try trigger a process for the action.
    *
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java 
b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
index 4b578d7fc..f155ae102 100644
--- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java
@@ -18,6 +18,7 @@
 
 package org.apache.amoro.process;
 
+import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableRuntime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,56 +28,41 @@ public abstract class TableProcess implements AmoroProcess {
 
   public static final Logger LOG = LoggerFactory.getLogger(TableProcess.class);
 
-  protected final TableRuntime tableRuntime;
-  protected final TableProcessStore tableProcessStore;
-  protected final int maxRetryTime;
   private final SimpleFuture submitFuture = new SimpleFuture();
   private final SimpleFuture completeFuture = new SimpleFuture();
 
-  protected TableProcess(TableRuntime tableRuntime) {
-    this(tableRuntime, null, 1);
-  }
-
-  protected TableProcess(TableRuntime tableRuntime, TableProcessStore 
tableProcessStore) {
-    this(tableRuntime, tableProcessStore, 1);
-  }
+  protected final TableRuntime tableRuntime;
+  private final ExecuteEngine executeEngine;
 
-  protected TableProcess(
-      TableRuntime tableRuntime, TableProcessStore tableProcessStore, int 
maxRetryTime) {
+  protected TableProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
     this.tableRuntime = tableRuntime;
-    this.tableProcessStore = tableProcessStore;
-    this.maxRetryTime = maxRetryTime;
+    this.executeEngine = engine;
   }
 
-  public TableRuntime getTableRuntime() {
-    return tableRuntime;
+  @Override
+  public SimpleFuture getSubmitFuture() {
+    return submitFuture.or(completeFuture);
   }
 
-  public String getExternalProcessIdentifier() {
-    // TODO: Add a new field to process meta to store external process 
identifier.(e.g. flink job id
-    // or yarn app id)
-    return tableProcessStore.getExternalProcessIdentifier();
+  @Override
+  public SimpleFuture getCompleteFuture() {
+    return completeFuture;
   }
 
-  @Override
-  public TableProcessStore store() {
-    return tableProcessStore;
+  public ServerTableIdentifier getTableIdentifier() {
+    return tableRuntime.getTableIdentifier();
   }
 
   @Override
-  public ProcessStatus getStatus() {
-    return tableProcessStore.getStatus();
+  public String getExecutionEngine() {
+    return executeEngine.name();
   }
 
-  protected abstract void closeInternal();
-
-  @Override
-  public SimpleFuture getSubmitFuture() {
-    return submitFuture.or(completeFuture);
+  public TableRuntime getTableRuntime() {
+    return tableRuntime;
   }
 
-  @Override
-  public SimpleFuture getCompleteFuture() {
-    return completeFuture;
+  public String getProcessStage() {
+    return "default";
   }
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java 
b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
index 5799166c5..bb242b094 100644
--- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
+++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.amoro.table;
 
+import org.apache.amoro.AmoroTable;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.process.ActionCoordinator;
@@ -26,6 +27,7 @@ import org.apache.amoro.process.ProcessFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 
 /** Table runtime factory. */
 public interface TableRuntimeFactory {
@@ -34,6 +36,8 @@ public interface TableRuntimeFactory {
 
   void initialize(List<ProcessFactory> factories);
 
+  void withTableLoader(Function<ServerTableIdentifier, AmoroTable<?>> loader);
+
   Optional<TableRuntimeCreator> accept(
       ServerTableIdentifier tableIdentifier, Map<String, String> 
tableProperties);
 

Reply via email to