This is an automated email from the ASF dual-hosted git repository.

baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 0721df985 [Improvement]: Refactor snapshot-expiring via ProcessFactory 
plugin (#4107)
0721df985 is described below

commit 0721df985beda5aeb76d9f6efb4f6c294247e728
Author: baiyangtx <[email protected]>
AuthorDate: Tue Mar 17 21:50:53 2026 +0800

    [Improvement]: Refactor snapshot-expiring via ProcessFactory plugin (#4107)
    
    * [Improvement] Refactor SnapshotExpiring inline executor with 
ProcessFactory
    
    
    
    Co-authored-by: zhangyongxiang.alpha <[email protected]>
    Co-authored-by: Aime <[email protected]>
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../apache/amoro/server/AmoroServiceContainer.java |   9 +-
 .../amoro/server/process/ProcessService.java       |   1 -
 .../process/iceberg/IcebergProcessFactory.java     | 140 +++++++++++++
 .../process/iceberg/SnapshotsExpiringProcess.java  |  79 +++++++
 .../scheduler/inline/InlineTableExecutors.java     |  12 --
 .../amoro/server/table/AbstractTableRuntime.java   |  12 ++
 .../amoro/server/table/DefaultTableRuntime.java    |   2 +-
 .../table/cleanup/TableRuntimeCleanupState.java    |   3 +-
 ...ory => org.apache.amoro.process.ProcessFactory} |   2 +-
 .../process/iceberg/TestIcebergProcessFactory.java | 146 +++++++++++++
 .../inline/TestConfigurableIntervalExecutors.java  |  32 ---
 .../main/java/org/apache/amoro/IcebergActions.java |   1 +
 .../main/java/org/apache/amoro/TableRuntime.java   |  20 ++
 .../org/apache/amoro/config/Configurations.java    |  13 ++
 .../apache/amoro/process/LocalExecutionEngine.java | 230 +++++++++++++++++++++
 .../LocalProcess.java}                             |  27 ++-
 .../org.apache.amoro.process.ExecuteEngine         |   2 +-
 .../amoro/process/TestLocalExecutionEngine.java    | 207 +++++++++++++++++++
 ...runtime-factories.yaml => execute-engines.yaml} |  11 +-
 ...ntime-factories.yaml => process-factories.yaml} |  11 +-
 20 files changed, 886 insertions(+), 74 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 0b268f7bf..4b38c9842 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -34,6 +34,7 @@ import org.apache.amoro.config.Configurations;
 import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
 import org.apache.amoro.exception.AmoroRuntimeException;
 import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ExecuteEngine;
 import org.apache.amoro.process.ProcessFactory;
 import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.catalog.DefaultCatalogManager;
@@ -242,6 +243,10 @@ public class AmoroServiceContainer {
     TableProcessFactoryManager tableProcessFactoryManager = new 
TableProcessFactoryManager();
     tableProcessFactoryManager.initialize();
     List<ProcessFactory> processFactories = 
tableProcessFactoryManager.installedPlugins();
+    ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
+    executeEngineManager.initialize();
+    List<ExecuteEngine> executeEngines = 
executeEngineManager.installedPlugins();
+    processFactories.forEach(c -> c.availableExecuteEngines(executeEngines));
 
     DefaultTableRuntimeFactory defaultRuntimeFactory = new 
DefaultTableRuntimeFactory();
     defaultRuntimeFactory.initialize(processFactories);
@@ -261,9 +266,6 @@ public class AmoroServiceContainer {
     }
 
     List<ActionCoordinator> actionCoordinators = 
defaultRuntimeFactory.supportedCoordinators();
-    ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
-    processFactories.forEach(
-        c -> 
c.availableExecuteEngines(executeEngineManager.installedPlugins()));
 
     tableService = new DefaultTableService(serviceConfig, catalogManager, 
defaultRuntimeFactory);
     processService = new ProcessService(tableService, actionCoordinators, 
executeEngineManager);
@@ -275,7 +277,6 @@ public class AmoroServiceContainer {
     addHandlerChain(optimizingService.getTableRuntimeHandler());
     addHandlerChain(processService.getTableHandlerChain());
     
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
-    
addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor());
     
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
     
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
     
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index 01944e8a2..c3cac4efe 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -128,7 +128,6 @@ public class ProcessService extends PersistentBase {
           actionCoordinator.action().getName(),
           new ActionCoordinatorScheduler(actionCoordinator, tableService, 
ProcessService.this));
     }
-    executeEngineManager.initialize();
     executeEngineManager
         .installedPlugins()
         .forEach(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
new file mode 100755
index 000000000..18de241db
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process.iceberg;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.ConfigOption;
+import org.apache.amoro.config.ConfigOptions;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.LocalExecutionEngine;
+import org.apache.amoro.process.ProcessFactory;
+import org.apache.amoro.process.ProcessTriggerStrategy;
+import org.apache.amoro.process.RecoverProcessFailedException;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Default process factory for Iceberg-related maintenance actions in AMS. */
+public class IcebergProcessFactory implements ProcessFactory {
+
+  public static final String PLUGIN_NAME = "iceberg";
+  public static final ConfigOption<Boolean> SNAPSHOT_EXPIRE_ENABLED =
+      
ConfigOptions.key("expire-snapshots.enabled").booleanType().defaultValue(true);
+
+  public static final ConfigOption<Duration> SNAPSHOT_EXPIRE_INTERVAL =
+      ConfigOptions.key("expire-snapshots.interval")
+          .durationType()
+          .defaultValue(Duration.ofHours(1));
+
+  private ExecuteEngine localEngine;
+  private final Map<Action, ProcessTriggerStrategy> actions = 
Maps.newHashMap();
+  private final List<TableFormat> formats =
+      Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, 
TableFormat.MIXED_HIVE);
+
+  @Override
+  public void availableExecuteEngines(Collection<ExecuteEngine> 
allAvailableEngines) {
+    for (ExecuteEngine engine : allAvailableEngines) {
+      if (engine instanceof LocalExecutionEngine) {
+        this.localEngine = engine;
+      }
+    }
+  }
+
+  @Override
+  public Map<TableFormat, Set<Action>> supportedActions() {
+    return formats.stream()
+        .map(f -> Pair.of(f, actions.keySet()))
+        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+  }
+
+  @Override
+  public ProcessTriggerStrategy triggerStrategy(TableFormat format, Action 
action) {
+    return actions.getOrDefault(action, 
ProcessTriggerStrategy.METADATA_TRIGGER);
+  }
+
+  @Override
+  public Optional<TableProcess> trigger(TableRuntime tableRuntime, Action 
action) {
+    if (!actions.containsKey(action)) {
+      return Optional.empty();
+    }
+
+    if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
+      return triggerExpireSnapshot(tableRuntime);
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public TableProcess recover(TableRuntime tableRuntime, TableProcessStore 
store)
+      throws RecoverProcessFailedException {
+    throw new RecoverProcessFailedException(
+        "Unsupported action for IcebergProcessFactory: " + store.getAction());
+  }
+
+  @Override
+  public void open(Map<String, String> properties) {
+    if (properties == null || properties.isEmpty()) {
+      return;
+    }
+    Configurations configs = Configurations.fromMap(properties);
+    if (configs.getBoolean(SNAPSHOT_EXPIRE_ENABLED)) {
+      Duration interval = configs.getDuration(SNAPSHOT_EXPIRE_INTERVAL);
+      this.actions.put(
+          IcebergActions.EXPIRE_SNAPSHOTS, 
ProcessTriggerStrategy.triggerAtFixRate(interval));
+    }
+  }
+
+  private Optional<TableProcess> triggerExpireSnapshot(TableRuntime 
tableRuntime) {
+    if (localEngine == null || 
!tableRuntime.getTableConfiguration().isExpireSnapshotEnabled()) {
+      return Optional.empty();
+    }
+
+    long lastExecuteTime =
+        
tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastSnapshotsExpiringTime();
+    ProcessTriggerStrategy strategy = 
actions.get(IcebergActions.EXPIRE_SNAPSHOTS);
+    if (System.currentTimeMillis() - lastExecuteTime < 
strategy.getTriggerInterval().toMillis()) {
+      return Optional.empty();
+    }
+
+    return Optional.of(new SnapshotsExpiringProcess(tableRuntime, 
localEngine));
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public String name() {
+    return PLUGIN_NAME;
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
new file mode 100755
index 000000000..3aea51d9f
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process.iceberg;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.maintainer.TableMaintainer;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.LocalProcess;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** Local table process for expiring Iceberg snapshots. */
+public class SnapshotsExpiringProcess extends TableProcess implements 
LocalProcess {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotsExpiringProcess.class);
+
+  public SnapshotsExpiringProcess(TableRuntime tableRuntime, ExecuteEngine 
engine) {
+    super(tableRuntime, engine);
+  }
+
+  @Override
+  public String tag() {
+    return getAction().getName().toLowerCase();
+  }
+
+  @Override
+  public void run() {
+    try {
+      AmoroTable<?> amoroTable = tableRuntime.loadTable();
+      TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, 
tableRuntime);
+      tableMaintainer.expireSnapshots();
+      tableRuntime.updateState(
+          DefaultTableRuntime.CLEANUP_STATE_KEY,
+          cleanUp -> 
cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis()));
+    } catch (Throwable t) {
+      LOG.error("unexpected expire error of table {} ", 
tableRuntime.getTableIdentifier(), t);
+    }
+  }
+
+  @Override
+  public Action getAction() {
+    return IcebergActions.EXPIRE_SNAPSHOTS;
+  }
+
+  @Override
+  public Map<String, String> getProcessParameters() {
+    return Maps.newHashMap();
+  }
+
+  @Override
+  public Map<String, String> getSummary() {
+    return Maps.newHashMap();
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 17205bfbf..17eb72e39 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -25,7 +25,6 @@ import org.apache.amoro.server.table.TableService;
 public class InlineTableExecutors {
 
   private static final InlineTableExecutors instance = new 
InlineTableExecutors();
-  private SnapshotsExpiringExecutor snapshotsExpiringExecutor;
   private TableRuntimeRefreshExecutor tableRefreshingExecutor;
   private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor;
   private DanglingDeleteFilesCleaningExecutor 
danglingDeleteFilesCleaningExecutor;
@@ -41,13 +40,6 @@ public class InlineTableExecutors {
   }
 
   public void setup(TableService tableService, Configurations conf) {
-    if (conf.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) {
-      this.snapshotsExpiringExecutor =
-          new SnapshotsExpiringExecutor(
-              tableService,
-              
conf.getInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT),
-              conf.get(AmoroManagementConf.EXPIRE_SNAPSHOTS_INTERVAL));
-    }
     if (conf.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) {
       this.orphanFilesCleaningExecutor =
           new OrphanFilesCleaningExecutor(
@@ -98,10 +90,6 @@ public class InlineTableExecutors {
     }
   }
 
-  public SnapshotsExpiringExecutor getSnapshotsExpiringExecutor() {
-    return snapshotsExpiringExecutor;
-  }
-
   public TableRuntimeRefreshExecutor getTableRefreshingExecutor() {
     return tableRefreshingExecutor;
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
index 05e17adc0..11f5f2f56 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
@@ -25,12 +25,14 @@ import org.apache.amoro.process.TableProcessStore;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.amoro.table.StateKey;
 import org.apache.amoro.table.TableRuntimeStore;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public abstract class AbstractTableRuntime extends PersistentBase
@@ -62,6 +64,16 @@ public abstract class AbstractTableRuntime extends 
PersistentBase
     return store().getTableConfig();
   }
 
+  @Override
+  public <T> T getState(StateKey<T> key) {
+    return store().getState(key);
+  }
+
+  @Override
+  public <T> void updateState(StateKey<T> key, Function<T, T> updater) {
+    store().begin().updateState(key, updater).commit();
+  }
+
   @Override
   public List<TableProcessStore> getProcessStates() {
     return processContainerMap.values().stream()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index ee23ab4d3..9b4b56a95 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -74,7 +74,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime 
{
           .jsonType(AbstractOptimizingEvaluator.PendingInput.class)
           .defaultValue(new AbstractOptimizingEvaluator.PendingInput());
 
-  private static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
+  public static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
       StateKey.stateKey("cleanup_state")
           .jsonType(TableRuntimeCleanupState.class)
           .defaultValue(new TableRuntimeCleanupState());
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
index 9dfb98f6e..f65ae387c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
@@ -52,7 +52,8 @@ public class TableRuntimeCleanupState {
     return lastSnapshotsExpiringTime;
   }
 
-  public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) {
+  public TableRuntimeCleanupState setLastSnapshotsExpiringTime(long 
lastSnapshotsExpiringTime) {
     this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime;
+    return this;
   }
 }
diff --git 
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
 
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory
old mode 100644
new mode 100755
similarity index 92%
copy from 
amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
copy to 
amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory
index a3b50433a..96baa8ebf
--- 
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
+++ 
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory
@@ -16,4 +16,4 @@
 # limitations under the License.
 #
 
-org.apache.amoro.server.table.DefaultTableRuntimeFactory
+org.apache.amoro.server.process.iceberg.IcebergProcessFactory
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
new file mode 100644
index 000000000..35151ce3d
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process.iceberg;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.process.LocalExecutionEngine;
+import org.apache.amoro.process.ProcessTriggerStrategy;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class TestIcebergProcessFactory {
+
+  @Test
+  public void testOpenAndSupportedActions() {
+    IcebergProcessFactory factory = new IcebergProcessFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("expire-snapshots.enabled", "true");
+    properties.put("expire-snapshots.interval", "1h");
+
+    factory.open(properties);
+
+    Map<TableFormat, Set<org.apache.amoro.Action>> supported = 
factory.supportedActions();
+    
Assert.assertTrue(supported.get(TableFormat.ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS));
+    Assert.assertTrue(
+        
supported.get(TableFormat.MIXED_ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS));
+    Assert.assertTrue(
+        
supported.get(TableFormat.MIXED_HIVE).contains(IcebergActions.EXPIRE_SNAPSHOTS));
+
+    ProcessTriggerStrategy strategy =
+        factory.triggerStrategy(TableFormat.ICEBERG, 
IcebergActions.EXPIRE_SNAPSHOTS);
+    Assert.assertEquals(Duration.ofHours(1), strategy.getTriggerInterval());
+  }
+
+  @Test
+  public void testTriggerExpireSnapshotWhenDue() {
+    IcebergProcessFactory factory = new IcebergProcessFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("expire-snapshots.enabled", "true");
+    properties.put("expire-snapshots.interval", "1h");
+    factory.open(properties);
+
+    LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
+    doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
+    factory.availableExecuteEngines(Arrays.asList(localEngine));
+
+    TableConfiguration tableConfiguration = new 
TableConfiguration().setExpireSnapshotEnabled(true);
+    TableRuntimeCleanupState cleanupState =
+        new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0);
+
+    TableRuntime runtime = mock(TableRuntime.class);
+    doReturn(tableConfiguration).when(runtime).getTableConfiguration();
+    
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+
+    Optional<org.apache.amoro.process.TableProcess> process =
+        factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
+
+    Assert.assertTrue(process.isPresent());
+    Assert.assertTrue(process.get() instanceof SnapshotsExpiringProcess);
+    Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME, 
process.get().getExecutionEngine());
+  }
+
+  @Test
+  public void testTriggerExpireSnapshotNotDue() {
+    IcebergProcessFactory factory = new IcebergProcessFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("expire-snapshots.enabled", "true");
+    properties.put("expire-snapshots.interval", "1h");
+    factory.open(properties);
+
+    
factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class)));
+
+    TableConfiguration tableConfiguration = new 
TableConfiguration().setExpireSnapshotEnabled(true);
+    long now = System.currentTimeMillis();
+    TableRuntimeCleanupState cleanupState =
+        new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(now);
+
+    TableRuntime runtime = mock(TableRuntime.class);
+    doReturn(tableConfiguration).when(runtime).getTableConfiguration();
+    
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+
+    Optional<org.apache.amoro.process.TableProcess> process =
+        factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
+
+    Assert.assertFalse(process.isPresent());
+  }
+
+  @Test
+  public void testTriggerExpireSnapshotDisabled() {
+    IcebergProcessFactory factory = new IcebergProcessFactory();
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("expire-snapshots.enabled", "true");
+    properties.put("expire-snapshots.interval", "1h");
+    factory.open(properties);
+
+    
factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class)));
+
+    TableConfiguration tableConfiguration =
+        new TableConfiguration().setExpireSnapshotEnabled(false);
+    TableRuntimeCleanupState cleanupState =
+        new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0);
+
+    TableRuntime runtime = mock(TableRuntime.class);
+    doReturn(tableConfiguration).when(runtime).getTableConfiguration();
+    
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+
+    Optional<org.apache.amoro.process.TableProcess> process =
+        factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
+
+    Assert.assertFalse(process.isPresent());
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
index 3a20c6101..a3d7b2515 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
@@ -64,36 +64,4 @@ public class TestConfigurableIntervalExecutors {
     // 5 hours ago - should not execute
     Assert.assertFalse(executor.shouldExecute(now - 
Duration.ofHours(5).toMillis()));
   }
-
-  @Test
-  public void testSnapshotsExpiringDefaultInterval() {
-    Duration interval = Duration.ofHours(1);
-    SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 
1, interval);
-
-    TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
-    Assert.assertEquals(
-        Duration.ofHours(1).toMillis(), 
executor.getNextExecutingTime(tableRuntime));
-  }
-
-  @Test
-  public void testSnapshotsExpiringCustomInterval() {
-    Duration interval = Duration.ofMinutes(30);
-    SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 
1, interval);
-
-    TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
-    Assert.assertEquals(
-        Duration.ofMinutes(30).toMillis(), 
executor.getNextExecutingTime(tableRuntime));
-  }
-
-  @Test
-  public void testSnapshotsExpiringShouldExecuteAfterInterval() {
-    Duration interval = Duration.ofHours(2);
-    SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 
1, interval);
-
-    long now = System.currentTimeMillis();
-    // 3 hours ago - should execute
-    Assert.assertTrue(executor.shouldExecute(now - 
Duration.ofHours(3).toMillis()));
-    // 1 hour ago - should not execute
-    Assert.assertFalse(executor.shouldExecute(now - 
Duration.ofHours(1).toMillis()));
-  }
 }
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java 
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index c75c5ac8d..7b8319260 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -28,4 +28,5 @@ public class IcebergActions {
   public static final Action DELETE_ORPHANS = 
Action.register("delete-orphans");
   public static final Action SYNC_HIVE = Action.register("sync-hive");
   public static final Action EXPIRE_DATA = Action.register("expire-data");
+  public static final Action EXPIRE_SNAPSHOTS = 
Action.register("expire-snapshots");
 }
diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java 
b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
index 1d96553fe..0059593e8 100644
--- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
+++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
@@ -22,9 +22,11 @@ import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.metrics.MetricRegistry;
 import org.apache.amoro.process.ProcessFactory;
 import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.table.StateKey;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 /**
  * TableRuntime is the key interface for the AMS framework to interact with 
the table. Typically, it
@@ -81,6 +83,24 @@ public interface TableRuntime {
    */
   Map<String, String> getTableConfig();
 
+  /**
+   * Get the value of table-runtime state
+   *
+   * @param key the state key
+   * @return value of the state
+   * @param <T> state value type
+   */
+  <T> T getState(StateKey<T> key);
+
+  /**
+   * Update the state
+   *
+   * @param key key of state
+   * @param updater value updater of state
+   * @param <T> value type of state.
+   */
+  <T> void updateState(StateKey<T> key, Function<T, T> updater);
+
   /**
    * Register the metric of the table runtime.
    *
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java 
b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
index 74ecf61a1..baaed361c 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
@@ -548,6 +548,19 @@ public class Configurations implements 
java.io.Serializable, Cloneable {
     return result;
   }
 
+  public Duration getDuration(ConfigOption<Duration> option) {
+    try {
+      return getOptional(option).orElseGet(option::defaultValue);
+    } catch (Exception e) { // may be throw java.lang.ArithmeticException: 
long overflow
+      throw new ConfigurationException(
+          option.key(),
+          String.format(
+              "Exception when converting duration for config option '%s': %s",
+              option.key(), e.getMessage()),
+          e);
+    }
+  }
+
   public <T> Optional<T> getOptional(ConfigOption<T> option) {
     Optional<Object> rawValue = getRawValueFromOption(option);
     Class<?> clazz = option.getClazz();
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java 
b/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java
new file mode 100755
index 000000000..5999de9f0
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import org.apache.amoro.config.ConfigOption;
+import org.apache.amoro.config.ConfigOptions;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Local execution engine that runs {@link 
org.apache.amoro.process.LocalProcess} instances in AMS
+ * thread pools.
+ *
+ * <p>The engine maintains multiple thread pools keyed by {@link
+ * org.apache.amoro.process.LocalProcess#tag()}.
+ */
+public class LocalExecutionEngine implements ExecuteEngine {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocalExecutionEngine.class);
+
+  public static final String ENGINE_NAME = "local";
+  public static final String DEFAULT_POOL = "default";
+  public static final String POOL_CONFIG_PREFIX = "pool.";
+  public static final String POOL_SIZE_SUFFIX = ".thread-count";
+  public static final ConfigOption<Integer> DEFAULT_POOL_SIZE =
+      ConfigOptions.key(POOL_CONFIG_PREFIX + DEFAULT_POOL + POOL_SIZE_SUFFIX)
+          .intType()
+          .defaultValue(10);
+  public static final ConfigOption<Duration> PROCESS_STATUS_TTL =
+      
ConfigOptions.key("process.status.ttl").durationType().defaultValue(Duration.ofHours(4));
+
+  private final Map<String, ThreadPoolExecutor> pools = new 
ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ProcessHolder> processes = new 
ConcurrentHashMap<>();
+
+  private long statusTtl = PROCESS_STATUS_TTL.defaultValue().toMillis();
+
+  @Override
+  public EngineType engineType() {
+    return EngineType.of(ENGINE_NAME);
+  }
+
+  @Override
+  public ProcessStatus getStatus(String processIdentifier) {
+    if (processIdentifier == null || processIdentifier.isEmpty()) {
+      return ProcessStatus.UNKNOWN;
+    }
+    expire();
+
+    ProcessHolder process = processes.get(processIdentifier);
+    if (process == null) {
+      return ProcessStatus.UNKNOWN;
+    }
+    return process.getStatus();
+  }
+
+  @Override
+  public String submitTableProcess(TableProcess tableProcess) {
+    if (!(tableProcess instanceof LocalProcess)) {
+      throw new IllegalArgumentException(
+          "LocalExecutionEngine only supports LocalProcess, but got: " + 
tableProcess.getClass());
+    }
+
+    LocalProcess localProcess = (LocalProcess) tableProcess;
+    String identifier = UUID.randomUUID().toString();
+
+    ThreadPoolExecutor executor = getPool(localProcess.tag());
+    CompletableFuture<?> future = 
CompletableFuture.runAsync(localProcess::run, executor);
+    processes.put(identifier, new ProcessHolder(future));
+    expire();
+    return identifier;
+  }
+
+  @Override
+  public ProcessStatus tryCancelTableProcess(TableProcess tableProcess, String 
processIdentifier) {
+    ProcessHolder p = this.processes.get(processIdentifier);
+    if (p == null) {
+      return ProcessStatus.UNKNOWN;
+    }
+    if (p.finishTime() > 0) {
+      return p.getStatus();
+    }
+    p.cancel();
+    return p.getStatus();
+  }
+
+  @Override
+  public void open(Map<String, String> properties) {
+    Configurations configs = Configurations.fromMap(properties);
+    int defaultSize = configs.getInteger(DEFAULT_POOL_SIZE);
+    pools.put(DEFAULT_POOL, newFixedPool(DEFAULT_POOL, defaultSize));
+
+    Set<String> customPools =
+        properties.keySet().stream()
+            .filter(key -> key.startsWith(POOL_CONFIG_PREFIX))
+            .map(key -> key.substring(POOL_CONFIG_PREFIX.length()))
+            .map(key -> key.substring(0, key.indexOf(".")))
+            .map(String::toLowerCase)
+            .filter(name -> !DEFAULT_POOL.equalsIgnoreCase(name))
+            .collect(Collectors.toSet());
+
+    customPools.forEach(
+        name -> {
+          ConfigOption<Integer> poolSizeOpt =
+              ConfigOptions.key(POOL_CONFIG_PREFIX + name + POOL_SIZE_SUFFIX)
+                  .intType()
+                  .defaultValue(-1);
+          int size = configs.getInteger(poolSizeOpt);
+          Preconditions.checkArgument(size > 0, "Pool thread-count is not 
configured for %s", name);
+          pools.put(name, newFixedPool(name, size));
+          LOG.info("Initialize local execute pool:{} with size:{}", name, 
size);
+        });
+    this.statusTtl = configs.getDurationInMillis(PROCESS_STATUS_TTL);
+  }
+
+  @Override
+  public void close() {
+    pools.values().forEach(ThreadPoolExecutor::shutdown);
+    pools.clear();
+  }
+
+  @Override
+  public String name() {
+    return ENGINE_NAME;
+  }
+
+  private ThreadPoolExecutor getPool(String tag) {
+    if (pools.containsKey(tag)) {
+      return pools.get(tag);
+    }
+    return pools.get(DEFAULT_POOL);
+  }
+
+  private ThreadPoolExecutor newFixedPool(String tag, int size) {
+    return new ThreadPoolExecutor(
+        size,
+        size,
+        60,
+        TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(),
+        new ThreadFactoryBuilder().setDaemon(false).setNameFormat("local-" + 
tag + "-%d").build());
+  }
+
+  private void expire() {
+    long threshold = System.currentTimeMillis() - statusTtl;
+    Set<String> expireIdentifiers =
+        processes.entrySet().stream()
+            .filter(e -> e.getValue().finishTime() > 0 && 
e.getValue().finishTime() < threshold)
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toSet());
+    expireIdentifiers.forEach(processes::remove);
+  }
+
+  private static class ProcessHolder {
+    private final CompletableFuture<?> future;
+    private final AtomicLong finishTime = new AtomicLong(-1);
+    private final AtomicReference<ProcessStatus> status =
+        new AtomicReference<>(ProcessStatus.RUNNING);
+    private final AtomicReference<String> failedInfo = new 
AtomicReference<>("");
+
+    public ProcessHolder(CompletableFuture<?> future) {
+      this.future = future;
+      this.future.whenComplete((v, t) -> onComplete(t));
+    }
+
+    private void onComplete(Throwable e) {
+      finishTime.compareAndSet(-1, System.currentTimeMillis());
+      if (e != null) {
+        status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.FAILED);
+        failedInfo.compareAndSet("", exceptionToString(e));
+      } else {
+        status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.SUCCESS);
+      }
+    }
+
+    private static String exceptionToString(Throwable throwable) {
+      StringWriter sw = new StringWriter();
+      throwable.printStackTrace(new PrintWriter(sw));
+      return sw.toString();
+    }
+
+    public ProcessStatus getStatus() {
+      return status.get();
+    }
+
+    public void cancel() {
+      if (finishTime() > 0) {
+        return;
+      }
+      status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.CANCELED);
+      future.cancel(true);
+    }
+
+    public long finishTime() {
+      return this.finishTime.get();
+    }
+  }
+}
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java 
b/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java
old mode 100644
new mode 100755
similarity index 58%
copy from amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
copy to amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java
index c75c5ac8d..7dc358ad8
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java
@@ -16,16 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.amoro;
+package org.apache.amoro.process;
 
-public class IcebergActions {
+/**
+ * A {@link TableProcess} that can be executed locally by a thread pool.
+ *
+ * <p>Local processes are executed by {@code ExecuteEngine} implementations 
which run the logic
+ * inside the AMS JVM.
+ */
+public interface LocalProcess extends AmoroProcess {
 
-  private static final TableFormat[] DEFAULT_FORMATS =
-      new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, 
TableFormat.MIXED_HIVE};
+  /** Execute process logic locally. */
+  void run();
 
-  public static final Action SYSTEM = Action.register("system");
-  public static final Action REWRITE = Action.register("rewrite");
-  public static final Action DELETE_ORPHANS = 
Action.register("delete-orphans");
-  public static final Action SYNC_HIVE = Action.register("sync-hive");
-  public static final Action EXPIRE_DATA = Action.register("expire-data");
+  /**
+   * Tag used by local execution engines to select a thread pool.
+   *
+   * @return pool tag
+   */
+  default String tag() {
+    return "default";
+  }
 }
diff --git 
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
 
b/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine
old mode 100644
new mode 100755
similarity index 93%
rename from 
amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
rename to 
amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine
index a3b50433a..8859bb6d3
--- 
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
+++ 
b/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine
@@ -16,4 +16,4 @@
 # limitations under the License.
 #
 
-org.apache.amoro.server.table.DefaultTableRuntimeFactory
+org.apache.amoro.process.LocalExecutionEngine
diff --git 
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
 
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
new file mode 100644
index 000000000..d6930622d
--- /dev/null
+++ 
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.TableRuntime;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestLocalExecutionEngine {
+
+  private LocalExecutionEngine engine;
+
+  @After
+  public void tearDown() {
+    if (engine != null) {
+      engine.close();
+    }
+  }
+
+  @Test
+  public void testSubmitUsesCustomPoolByTag() throws Exception {
+    engine = createEngineWithTtl("1h");
+
+    CountDownLatch started = new CountDownLatch(1);
+    AtomicReference<String> threadName = new AtomicReference<>();
+
+    LocalProcessTableProcess process =
+        new LocalProcessTableProcess(
+            mock(TableRuntime.class),
+            engine,
+            "snapshots-expiring",
+            () -> {
+              threadName.set(Thread.currentThread().getName());
+              started.countDown();
+            });
+
+    String identifier = engine.submitTableProcess(process);
+
+    Assert.assertTrue("process should start", started.await(5, 
TimeUnit.SECONDS));
+    Assert.assertTrue(
+        "should run in custom pool",
+        threadName.get() != null && 
threadName.get().startsWith("local-snapshots-expiring-"));
+
+    waitForStatus(identifier, ProcessStatus.SUCCESS, 5000);
+  }
+
+  @Test
+  public void testCancelRunningProcess() throws Exception {
+    engine = createEngineWithTtl("1h");
+
+    CountDownLatch blockLatch = new CountDownLatch(1);
+    LocalProcessTableProcess process =
+        new LocalProcessTableProcess(
+            mock(TableRuntime.class),
+            engine,
+            "default",
+            () -> {
+              try {
+                blockLatch.await();
+              } catch (InterruptedException ignored) {
+                // ignore
+              }
+            });
+
+    String identifier = engine.submitTableProcess(process);
+
+    // Process may not be scheduled yet, but holder is already created.
+    Assert.assertEquals(ProcessStatus.RUNNING, engine.getStatus(identifier));
+
+    engine.tryCancelTableProcess(process, identifier);
+
+    // Eventually the process should be marked as canceled.
+    waitForStatus(identifier, ProcessStatus.CANCELED, 5000);
+  }
+
+  @Test
+  public void testFinishedStatusExpired() throws Exception {
+    engine = createEngineWithTtl("100ms");
+
+    LocalProcessTableProcess process =
+        new LocalProcessTableProcess(mock(TableRuntime.class), engine, 
"default", () -> {});
+
+    String identifier = engine.submitTableProcess(process);
+
+    waitForStatus(identifier, ProcessStatus.SUCCESS, 5000);
+
+    // Wait until process info should be expired.
+    Thread.sleep(200);
+
+    Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus(identifier));
+  }
+
+  @Test
+  public void testFailedProcessStatus() throws Exception {
+    engine = createEngineWithTtl("1h");
+
+    LocalProcessTableProcess process =
+        new LocalProcessTableProcess(
+            mock(TableRuntime.class),
+            engine,
+            "default",
+            () -> {
+              throw new RuntimeException("boom");
+            });
+
+    String identifier = engine.submitTableProcess(process);
+
+    waitForStatus(identifier, ProcessStatus.FAILED, 5000);
+  }
+
+  @Test
+  public void testGetStatusForInvalidIdentifier() {
+    engine = createEngineWithTtl("1h");
+
+    Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus(null));
+    Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus(""));
+    Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus("not-exist"));
+  }
+
+  private LocalExecutionEngine createEngineWithTtl(String ttl) {
+    LocalExecutionEngine localEngine = new LocalExecutionEngine();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("pool.default.thread-count", "1");
+    properties.put("pool.snapshots-expiring.thread-count", "1");
+    properties.put("process.status.ttl", ttl);
+    localEngine.open(properties);
+    return localEngine;
+  }
+
+  private void waitForStatus(String identifier, ProcessStatus expected, long 
timeoutMillis)
+      throws InterruptedException {
+    long deadline = System.currentTimeMillis() + timeoutMillis;
+    while (System.currentTimeMillis() < deadline) {
+      ProcessStatus status = engine.getStatus(identifier);
+      if (status == expected) {
+        return;
+      }
+      Thread.sleep(10);
+    }
+    Assert.fail(
+        "Timeout waiting for status " + expected + ", current=" + 
engine.getStatus(identifier));
+  }
+
+  private static class LocalProcessTableProcess extends TableProcess 
implements LocalProcess {
+
+    private final String tag;
+    private final Runnable runnable;
+
+    LocalProcessTableProcess(
+        TableRuntime tableRuntime, ExecuteEngine engine, String tag, Runnable 
runnable) {
+      super(tableRuntime, engine);
+      this.tag = tag;
+      this.runnable = runnable;
+    }
+
+    @Override
+    public void run() {
+      runnable.run();
+    }
+
+    @Override
+    public String tag() {
+      return tag;
+    }
+
+    @Override
+    public Action getAction() {
+      return Action.register("TEST");
+    }
+
+    @Override
+    public Map<String, String> getProcessParameters() {
+      return new HashMap<>();
+    }
+
+    @Override
+    public Map<String, String> getSummary() {
+      return new HashMap<>();
+    }
+  }
+}
diff --git a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml 
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
old mode 100644
new mode 100755
similarity index 81%
copy from dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
copy to dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index 34cf7c94d..5c3199f32
--- a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -15,11 +15,10 @@
 # limitations under the License.
 #
 
-# This file is config file for plugin metric reporter
-# configurations of metric reporters
-
-table-runtime-factories:
-  - name: default            # configs for table runtime factory
+execute-engines:
+  - name: local
     enabled: true
     priority: 100
-
+    properties:
+      pool.default.thread-count: 10
+      pool.snapshots-expiring.thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml 
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
old mode 100644
new mode 100755
similarity index 81%
rename from dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
rename to dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index 34cf7c94d..e1455c2cf
--- a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -15,11 +15,10 @@
 # limitations under the License.
 #
 
-# This file is config file for plugin metric reporter
-# configurations of metric reporters
-
-table-runtime-factories:
-  - name: default            # configs for table runtime factory
+process-factories:
+  - name: iceberg
     enabled: true
     priority: 100
-
+    properties:
+      expire-snapshots.enabled: "true"
+      expire-snapshots.interval: "1h"

Reply via email to