This is an automated email from the ASF dual-hosted git repository.
baiyangtx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 0721df985 [Improvement]: Refactor snapshot-expiring via ProcessFactory
plugin (#4107)
0721df985 is described below
commit 0721df985beda5aeb76d9f6efb4f6c294247e728
Author: baiyangtx <[email protected]>
AuthorDate: Tue Mar 17 21:50:53 2026 +0800
[Improvement]: Refactor snapshot-expiring via ProcessFactory plugin (#4107)
* [Improvement] Refactor SnapshotExpiring inline executor with
ProcessFactory
Co-authored-by: zhangyongxiang.alpha <[email protected]>
Co-authored-by: Aime <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 9 +-
.../amoro/server/process/ProcessService.java | 1 -
.../process/iceberg/IcebergProcessFactory.java | 140 +++++++++++++
.../process/iceberg/SnapshotsExpiringProcess.java | 79 +++++++
.../scheduler/inline/InlineTableExecutors.java | 12 --
.../amoro/server/table/AbstractTableRuntime.java | 12 ++
.../amoro/server/table/DefaultTableRuntime.java | 2 +-
.../table/cleanup/TableRuntimeCleanupState.java | 3 +-
...ory => org.apache.amoro.process.ProcessFactory} | 2 +-
.../process/iceberg/TestIcebergProcessFactory.java | 146 +++++++++++++
.../inline/TestConfigurableIntervalExecutors.java | 32 ---
.../main/java/org/apache/amoro/IcebergActions.java | 1 +
.../main/java/org/apache/amoro/TableRuntime.java | 20 ++
.../org/apache/amoro/config/Configurations.java | 13 ++
.../apache/amoro/process/LocalExecutionEngine.java | 230 +++++++++++++++++++++
.../LocalProcess.java} | 27 ++-
.../org.apache.amoro.process.ExecuteEngine | 2 +-
.../amoro/process/TestLocalExecutionEngine.java | 207 +++++++++++++++++++
...runtime-factories.yaml => execute-engines.yaml} | 11 +-
...ntime-factories.yaml => process-factories.yaml} | 11 +-
20 files changed, 886 insertions(+), 74 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 0b268f7bf..4b38c9842 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -34,6 +34,7 @@ import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.process.ActionCoordinator;
+import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
@@ -242,6 +243,10 @@ public class AmoroServiceContainer {
TableProcessFactoryManager tableProcessFactoryManager = new
TableProcessFactoryManager();
tableProcessFactoryManager.initialize();
List<ProcessFactory> processFactories =
tableProcessFactoryManager.installedPlugins();
+ ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
+ executeEngineManager.initialize();
+ List<ExecuteEngine> executeEngines =
executeEngineManager.installedPlugins();
+ processFactories.forEach(c -> c.availableExecuteEngines(executeEngines));
DefaultTableRuntimeFactory defaultRuntimeFactory = new
DefaultTableRuntimeFactory();
defaultRuntimeFactory.initialize(processFactories);
@@ -261,9 +266,6 @@ public class AmoroServiceContainer {
}
List<ActionCoordinator> actionCoordinators =
defaultRuntimeFactory.supportedCoordinators();
- ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
- processFactories.forEach(
- c ->
c.availableExecuteEngines(executeEngineManager.installedPlugins()));
tableService = new DefaultTableService(serviceConfig, catalogManager,
defaultRuntimeFactory);
processService = new ProcessService(tableService, actionCoordinators,
executeEngineManager);
@@ -275,7 +277,6 @@ public class AmoroServiceContainer {
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(processService.getTableHandlerChain());
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
-
addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
index 01944e8a2..c3cac4efe 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java
@@ -128,7 +128,6 @@ public class ProcessService extends PersistentBase {
actionCoordinator.action().getName(),
new ActionCoordinatorScheduler(actionCoordinator, tableService,
ProcessService.this));
}
- executeEngineManager.initialize();
executeEngineManager
.installedPlugins()
.forEach(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
new file mode 100755
index 000000000..18de241db
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process.iceberg;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.ConfigOption;
+import org.apache.amoro.config.ConfigOptions;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.LocalExecutionEngine;
+import org.apache.amoro.process.ProcessFactory;
+import org.apache.amoro.process.ProcessTriggerStrategy;
+import org.apache.amoro.process.RecoverProcessFailedException;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Default process factory for Iceberg-related maintenance actions in AMS. */
+public class IcebergProcessFactory implements ProcessFactory {
+
+ public static final String PLUGIN_NAME = "iceberg";
+ public static final ConfigOption<Boolean> SNAPSHOT_EXPIRE_ENABLED =
+
ConfigOptions.key("expire-snapshots.enabled").booleanType().defaultValue(true);
+
+ public static final ConfigOption<Duration> SNAPSHOT_EXPIRE_INTERVAL =
+ ConfigOptions.key("expire-snapshots.interval")
+ .durationType()
+ .defaultValue(Duration.ofHours(1));
+
+ private ExecuteEngine localEngine;
+ private final Map<Action, ProcessTriggerStrategy> actions =
Maps.newHashMap();
+ private final List<TableFormat> formats =
+ Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE);
+
+ @Override
+ public void availableExecuteEngines(Collection<ExecuteEngine>
allAvailableEngines) {
+ for (ExecuteEngine engine : allAvailableEngines) {
+ if (engine instanceof LocalExecutionEngine) {
+ this.localEngine = engine;
+ }
+ }
+ }
+
+ @Override
+ public Map<TableFormat, Set<Action>> supportedActions() {
+ return formats.stream()
+ .map(f -> Pair.of(f, actions.keySet()))
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ }
+
+ @Override
+ public ProcessTriggerStrategy triggerStrategy(TableFormat format, Action
action) {
+ return actions.getOrDefault(action,
ProcessTriggerStrategy.METADATA_TRIGGER);
+ }
+
+ @Override
+ public Optional<TableProcess> trigger(TableRuntime tableRuntime, Action
action) {
+ if (!actions.containsKey(action)) {
+ return Optional.empty();
+ }
+
+ if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
+ return triggerExpireSnapshot(tableRuntime);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public TableProcess recover(TableRuntime tableRuntime, TableProcessStore
store)
+ throws RecoverProcessFailedException {
+ throw new RecoverProcessFailedException(
+ "Unsupported action for IcebergProcessFactory: " + store.getAction());
+ }
+
+ @Override
+ public void open(Map<String, String> properties) {
+ if (properties == null || properties.isEmpty()) {
+ return;
+ }
+ Configurations configs = Configurations.fromMap(properties);
+ if (configs.getBoolean(SNAPSHOT_EXPIRE_ENABLED)) {
+ Duration interval = configs.getDuration(SNAPSHOT_EXPIRE_INTERVAL);
+ this.actions.put(
+ IcebergActions.EXPIRE_SNAPSHOTS,
ProcessTriggerStrategy.triggerAtFixRate(interval));
+ }
+ }
+
+ private Optional<TableProcess> triggerExpireSnapshot(TableRuntime
tableRuntime) {
+ if (localEngine == null ||
!tableRuntime.getTableConfiguration().isExpireSnapshotEnabled()) {
+ return Optional.empty();
+ }
+
+ long lastExecuteTime =
+
tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastSnapshotsExpiringTime();
+ ProcessTriggerStrategy strategy =
actions.get(IcebergActions.EXPIRE_SNAPSHOTS);
+ if (System.currentTimeMillis() - lastExecuteTime <
strategy.getTriggerInterval().toMillis()) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new SnapshotsExpiringProcess(tableRuntime,
localEngine));
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public String name() {
+ return PLUGIN_NAME;
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
new file mode 100755
index 000000000..3aea51d9f
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process.iceberg;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.maintainer.TableMaintainer;
+import org.apache.amoro.process.ExecuteEngine;
+import org.apache.amoro.process.LocalProcess;
+import org.apache.amoro.process.TableProcess;
+import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** Local table process for expiring Iceberg snapshots. */
+public class SnapshotsExpiringProcess extends TableProcess implements
LocalProcess {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotsExpiringProcess.class);
+
+ public SnapshotsExpiringProcess(TableRuntime tableRuntime, ExecuteEngine
engine) {
+ super(tableRuntime, engine);
+ }
+
+ @Override
+ public String tag() {
+ return getAction().getName().toLowerCase();
+ }
+
+ @Override
+ public void run() {
+ try {
+ AmoroTable<?> amoroTable = tableRuntime.loadTable();
+ TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable,
tableRuntime);
+ tableMaintainer.expireSnapshots();
+ tableRuntime.updateState(
+ DefaultTableRuntime.CLEANUP_STATE_KEY,
+ cleanUp ->
cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis()));
+ } catch (Throwable t) {
+ LOG.error("unexpected expire error of table {} ",
tableRuntime.getTableIdentifier(), t);
+ }
+ }
+
+ @Override
+ public Action getAction() {
+ return IcebergActions.EXPIRE_SNAPSHOTS;
+ }
+
+ @Override
+ public Map<String, String> getProcessParameters() {
+ return Maps.newHashMap();
+ }
+
+ @Override
+ public Map<String, String> getSummary() {
+ return Maps.newHashMap();
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
index 17205bfbf..17eb72e39 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java
@@ -25,7 +25,6 @@ import org.apache.amoro.server.table.TableService;
public class InlineTableExecutors {
private static final InlineTableExecutors instance = new
InlineTableExecutors();
- private SnapshotsExpiringExecutor snapshotsExpiringExecutor;
private TableRuntimeRefreshExecutor tableRefreshingExecutor;
private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor;
private DanglingDeleteFilesCleaningExecutor
danglingDeleteFilesCleaningExecutor;
@@ -41,13 +40,6 @@ public class InlineTableExecutors {
}
public void setup(TableService tableService, Configurations conf) {
- if (conf.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) {
- this.snapshotsExpiringExecutor =
- new SnapshotsExpiringExecutor(
- tableService,
-
conf.getInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT),
- conf.get(AmoroManagementConf.EXPIRE_SNAPSHOTS_INTERVAL));
- }
if (conf.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) {
this.orphanFilesCleaningExecutor =
new OrphanFilesCleaningExecutor(
@@ -98,10 +90,6 @@ public class InlineTableExecutors {
}
}
- public SnapshotsExpiringExecutor getSnapshotsExpiringExecutor() {
- return snapshotsExpiringExecutor;
- }
-
public TableRuntimeRefreshExecutor getTableRefreshingExecutor() {
return tableRefreshingExecutor;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
index 05e17adc0..11f5f2f56 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java
@@ -25,12 +25,14 @@ import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import
org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps;
+import org.apache.amoro.table.StateKey;
import org.apache.amoro.table.TableRuntimeStore;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class AbstractTableRuntime extends PersistentBase
@@ -62,6 +64,16 @@ public abstract class AbstractTableRuntime extends
PersistentBase
return store().getTableConfig();
}
+ @Override
+ public <T> T getState(StateKey<T> key) {
+ return store().getState(key);
+ }
+
+ @Override
+ public <T> void updateState(StateKey<T> key, Function<T, T> updater) {
+ store().begin().updateState(key, updater).commit();
+ }
+
@Override
public List<TableProcessStore> getProcessStates() {
return processContainerMap.values().stream()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
index ee23ab4d3..9b4b56a95 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
@@ -74,7 +74,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime
{
.jsonType(AbstractOptimizingEvaluator.PendingInput.class)
.defaultValue(new AbstractOptimizingEvaluator.PendingInput());
- private static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
+ public static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
StateKey.stateKey("cleanup_state")
.jsonType(TableRuntimeCleanupState.class)
.defaultValue(new TableRuntimeCleanupState());
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
index 9dfb98f6e..f65ae387c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java
@@ -52,7 +52,8 @@ public class TableRuntimeCleanupState {
return lastSnapshotsExpiringTime;
}
- public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) {
+ public TableRuntimeCleanupState setLastSnapshotsExpiringTime(long
lastSnapshotsExpiringTime) {
this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime;
+ return this;
}
}
diff --git
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory
old mode 100644
new mode 100755
similarity index 92%
copy from
amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
copy to
amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory
index a3b50433a..96baa8ebf
---
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
+++
b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory
@@ -16,4 +16,4 @@
# limitations under the License.
#
-org.apache.amoro.server.table.DefaultTableRuntimeFactory
+org.apache.amoro.server.process.iceberg.IcebergProcessFactory
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
new file mode 100644
index 000000000..35151ce3d
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.process.iceberg;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.amoro.IcebergActions;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
+import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.process.LocalExecutionEngine;
+import org.apache.amoro.process.ProcessTriggerStrategy;
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class TestIcebergProcessFactory {
+
+ @Test
+ public void testOpenAndSupportedActions() {
+ IcebergProcessFactory factory = new IcebergProcessFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("expire-snapshots.enabled", "true");
+ properties.put("expire-snapshots.interval", "1h");
+
+ factory.open(properties);
+
+ Map<TableFormat, Set<org.apache.amoro.Action>> supported =
factory.supportedActions();
+
Assert.assertTrue(supported.get(TableFormat.ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS));
+ Assert.assertTrue(
+
supported.get(TableFormat.MIXED_ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS));
+ Assert.assertTrue(
+
supported.get(TableFormat.MIXED_HIVE).contains(IcebergActions.EXPIRE_SNAPSHOTS));
+
+ ProcessTriggerStrategy strategy =
+ factory.triggerStrategy(TableFormat.ICEBERG,
IcebergActions.EXPIRE_SNAPSHOTS);
+ Assert.assertEquals(Duration.ofHours(1), strategy.getTriggerInterval());
+ }
+
+ @Test
+ public void testTriggerExpireSnapshotWhenDue() {
+ IcebergProcessFactory factory = new IcebergProcessFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("expire-snapshots.enabled", "true");
+ properties.put("expire-snapshots.interval", "1h");
+ factory.open(properties);
+
+ LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class);
+ doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name();
+ factory.availableExecuteEngines(Arrays.asList(localEngine));
+
+ TableConfiguration tableConfiguration = new
TableConfiguration().setExpireSnapshotEnabled(true);
+ TableRuntimeCleanupState cleanupState =
+ new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0);
+
+ TableRuntime runtime = mock(TableRuntime.class);
+ doReturn(tableConfiguration).when(runtime).getTableConfiguration();
+
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+
+ Optional<org.apache.amoro.process.TableProcess> process =
+ factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
+
+ Assert.assertTrue(process.isPresent());
+ Assert.assertTrue(process.get() instanceof SnapshotsExpiringProcess);
+ Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME,
process.get().getExecutionEngine());
+ }
+
+ @Test
+ public void testTriggerExpireSnapshotNotDue() {
+ IcebergProcessFactory factory = new IcebergProcessFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("expire-snapshots.enabled", "true");
+ properties.put("expire-snapshots.interval", "1h");
+ factory.open(properties);
+
+
factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class)));
+
+ TableConfiguration tableConfiguration = new
TableConfiguration().setExpireSnapshotEnabled(true);
+ long now = System.currentTimeMillis();
+ TableRuntimeCleanupState cleanupState =
+ new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(now);
+
+ TableRuntime runtime = mock(TableRuntime.class);
+ doReturn(tableConfiguration).when(runtime).getTableConfiguration();
+
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+
+ Optional<org.apache.amoro.process.TableProcess> process =
+ factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
+
+ Assert.assertFalse(process.isPresent());
+ }
+
+ @Test
+ public void testTriggerExpireSnapshotDisabled() {
+ IcebergProcessFactory factory = new IcebergProcessFactory();
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("expire-snapshots.enabled", "true");
+ properties.put("expire-snapshots.interval", "1h");
+ factory.open(properties);
+
+
factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class)));
+
+ TableConfiguration tableConfiguration =
+ new TableConfiguration().setExpireSnapshotEnabled(false);
+ TableRuntimeCleanupState cleanupState =
+ new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0);
+
+ TableRuntime runtime = mock(TableRuntime.class);
+ doReturn(tableConfiguration).when(runtime).getTableConfiguration();
+
doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY);
+
+ Optional<org.apache.amoro.process.TableProcess> process =
+ factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS);
+
+ Assert.assertFalse(process.isPresent());
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
index 3a20c6101..a3d7b2515 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java
@@ -64,36 +64,4 @@ public class TestConfigurableIntervalExecutors {
// 5 hours ago - should not execute
Assert.assertFalse(executor.shouldExecute(now -
Duration.ofHours(5).toMillis()));
}
-
- @Test
- public void testSnapshotsExpiringDefaultInterval() {
- Duration interval = Duration.ofHours(1);
- SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null,
1, interval);
-
- TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
- Assert.assertEquals(
- Duration.ofHours(1).toMillis(),
executor.getNextExecutingTime(tableRuntime));
- }
-
- @Test
- public void testSnapshotsExpiringCustomInterval() {
- Duration interval = Duration.ofMinutes(30);
- SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null,
1, interval);
-
- TableRuntime tableRuntime = Mockito.mock(TableRuntime.class);
- Assert.assertEquals(
- Duration.ofMinutes(30).toMillis(),
executor.getNextExecutingTime(tableRuntime));
- }
-
- @Test
- public void testSnapshotsExpiringShouldExecuteAfterInterval() {
- Duration interval = Duration.ofHours(2);
- SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null,
1, interval);
-
- long now = System.currentTimeMillis();
- // 3 hours ago - should execute
- Assert.assertTrue(executor.shouldExecute(now -
Duration.ofHours(3).toMillis()));
- // 1 hour ago - should not execute
- Assert.assertFalse(executor.shouldExecute(now -
Duration.ofHours(1).toMillis()));
- }
}
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
index c75c5ac8d..7b8319260 100644
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
@@ -28,4 +28,5 @@ public class IcebergActions {
public static final Action DELETE_ORPHANS =
Action.register("delete-orphans");
public static final Action SYNC_HIVE = Action.register("sync-hive");
public static final Action EXPIRE_DATA = Action.register("expire-data");
+ public static final Action EXPIRE_SNAPSHOTS =
Action.register("expire-snapshots");
}
diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
index 1d96553fe..0059593e8 100644
--- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
+++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java
@@ -22,9 +22,11 @@ import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.metrics.MetricRegistry;
import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.process.TableProcessStore;
+import org.apache.amoro.table.StateKey;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
/**
* TableRuntime is the key interface for the AMS framework to interact with
the table. Typically, it
@@ -81,6 +83,24 @@ public interface TableRuntime {
*/
Map<String, String> getTableConfig();
+ /**
+ * Get the value of table-runtime state
+ *
+ * @param key the state key
+ * @return value of the state
+ * @param <T> state value type
+ */
+ <T> T getState(StateKey<T> key);
+
+ /**
+ * Update the state
+ *
+ * @param key key of state
+ * @param updater value updater of state
+ * @param <T> value type of state.
+ */
+ <T> void updateState(StateKey<T> key, Function<T, T> updater);
+
/**
* Register the metric of the table runtime.
*
diff --git
a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
index 74ecf61a1..baaed361c 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java
@@ -548,6 +548,19 @@ public class Configurations implements
java.io.Serializable, Cloneable {
return result;
}
+ public Duration getDuration(ConfigOption<Duration> option) {
+ try {
+ return getOptional(option).orElseGet(option::defaultValue);
+ } catch (Exception e) { // may be throw java.lang.ArithmeticException:
long overflow
+ throw new ConfigurationException(
+ option.key(),
+ String.format(
+ "Exception when converting duration for config option '%s': %s",
+ option.key(), e.getMessage()),
+ e);
+ }
+ }
+
public <T> Optional<T> getOptional(ConfigOption<T> option) {
Optional<Object> rawValue = getRawValueFromOption(option);
Class<?> clazz = option.getClazz();
diff --git
a/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java
b/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java
new file mode 100755
index 000000000..5999de9f0
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import org.apache.amoro.config.ConfigOption;
+import org.apache.amoro.config.ConfigOptions;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Local execution engine that runs {@link
org.apache.amoro.process.LocalProcess} instances in AMS
+ * thread pools.
+ *
+ * <p>The engine maintains multiple thread pools keyed by {@link
+ * org.apache.amoro.process.LocalProcess#tag()}.
+ */
+public class LocalExecutionEngine implements ExecuteEngine {
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalExecutionEngine.class);
+
+ public static final String ENGINE_NAME = "local";
+ public static final String DEFAULT_POOL = "default";
+ public static final String POOL_CONFIG_PREFIX = "pool.";
+ public static final String POOL_SIZE_SUFFIX = ".thread-count";
+ public static final ConfigOption<Integer> DEFAULT_POOL_SIZE =
+ ConfigOptions.key(POOL_CONFIG_PREFIX + DEFAULT_POOL + POOL_SIZE_SUFFIX)
+ .intType()
+ .defaultValue(10);
+ public static final ConfigOption<Duration> PROCESS_STATUS_TTL =
+
ConfigOptions.key("process.status.ttl").durationType().defaultValue(Duration.ofHours(4));
+
+ private final Map<String, ThreadPoolExecutor> pools = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ProcessHolder> processes = new
ConcurrentHashMap<>();
+
+ private long statusTtl = PROCESS_STATUS_TTL.defaultValue().toMillis();
+
+ @Override
+ public EngineType engineType() {
+ return EngineType.of(ENGINE_NAME);
+ }
+
+ @Override
+ public ProcessStatus getStatus(String processIdentifier) {
+ if (processIdentifier == null || processIdentifier.isEmpty()) {
+ return ProcessStatus.UNKNOWN;
+ }
+ expire();
+
+ ProcessHolder process = processes.get(processIdentifier);
+ if (process == null) {
+ return ProcessStatus.UNKNOWN;
+ }
+ return process.getStatus();
+ }
+
+ @Override
+ public String submitTableProcess(TableProcess tableProcess) {
+ if (!(tableProcess instanceof LocalProcess)) {
+ throw new IllegalArgumentException(
+ "LocalExecutionEngine only supports LocalProcess, but got: " +
tableProcess.getClass());
+ }
+
+ LocalProcess localProcess = (LocalProcess) tableProcess;
+ String identifier = UUID.randomUUID().toString();
+
+ ThreadPoolExecutor executor = getPool(localProcess.tag());
+ CompletableFuture<?> future =
CompletableFuture.runAsync(localProcess::run, executor);
+ processes.put(identifier, new ProcessHolder(future));
+ expire();
+ return identifier;
+ }
+
+ @Override
+ public ProcessStatus tryCancelTableProcess(TableProcess tableProcess, String
processIdentifier) {
+ ProcessHolder p = this.processes.get(processIdentifier);
+ if (p == null) {
+ return ProcessStatus.UNKNOWN;
+ }
+ if (p.finishTime() > 0) {
+ return p.getStatus();
+ }
+ p.cancel();
+ return p.getStatus();
+ }
+
+ @Override
+ public void open(Map<String, String> properties) {
+ Configurations configs = Configurations.fromMap(properties);
+ int defaultSize = configs.getInteger(DEFAULT_POOL_SIZE);
+ pools.put(DEFAULT_POOL, newFixedPool(DEFAULT_POOL, defaultSize));
+
+ Set<String> customPools =
+ properties.keySet().stream()
+ .filter(key -> key.startsWith(POOL_CONFIG_PREFIX))
+ .map(key -> key.substring(POOL_CONFIG_PREFIX.length()))
+ .map(key -> key.substring(0, key.indexOf(".")))
+ .map(String::toLowerCase)
+ .filter(name -> !DEFAULT_POOL.equalsIgnoreCase(name))
+ .collect(Collectors.toSet());
+
+ customPools.forEach(
+ name -> {
+ ConfigOption<Integer> poolSizeOpt =
+ ConfigOptions.key(POOL_CONFIG_PREFIX + name + POOL_SIZE_SUFFIX)
+ .intType()
+ .defaultValue(-1);
+ int size = configs.getInteger(poolSizeOpt);
+ Preconditions.checkArgument(size > 0, "Pool thread-count is not
configured for %s", name);
+ pools.put(name, newFixedPool(name, size));
+ LOG.info("Initialize local execute pool:{} with size:{}", name,
size);
+ });
+ this.statusTtl = configs.getDurationInMillis(PROCESS_STATUS_TTL);
+ }
+
+ @Override
+ public void close() {
+ pools.values().forEach(ThreadPoolExecutor::shutdown);
+ pools.clear();
+ }
+
+ @Override
+ public String name() {
+ return ENGINE_NAME;
+ }
+
+ private ThreadPoolExecutor getPool(String tag) {
+ if (pools.containsKey(tag)) {
+ return pools.get(tag);
+ }
+ return pools.get(DEFAULT_POOL);
+ }
+
+ private ThreadPoolExecutor newFixedPool(String tag, int size) {
+ return new ThreadPoolExecutor(
+ size,
+ size,
+ 60,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setDaemon(false).setNameFormat("local-" +
tag + "-%d").build());
+ }
+
+ private void expire() {
+ long threshold = System.currentTimeMillis() - statusTtl;
+ Set<String> expireIdentifiers =
+ processes.entrySet().stream()
+ .filter(e -> e.getValue().finishTime() > 0 &&
e.getValue().finishTime() < threshold)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ expireIdentifiers.forEach(processes::remove);
+ }
+
+ private static class ProcessHolder {
+ private final CompletableFuture<?> future;
+ private final AtomicLong finishTime = new AtomicLong(-1);
+ private final AtomicReference<ProcessStatus> status =
+ new AtomicReference<>(ProcessStatus.RUNNING);
+ private final AtomicReference<String> failedInfo = new
AtomicReference<>("");
+
+ public ProcessHolder(CompletableFuture<?> future) {
+ this.future = future;
+ this.future.whenComplete((v, t) -> onComplete(t));
+ }
+
+ private void onComplete(Throwable e) {
+ finishTime.compareAndSet(-1, System.currentTimeMillis());
+ if (e != null) {
+ status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.FAILED);
+ failedInfo.compareAndSet("", exceptionToString(e));
+ } else {
+ status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.SUCCESS);
+ }
+ }
+
+ private static String exceptionToString(Throwable throwable) {
+ StringWriter sw = new StringWriter();
+ throwable.printStackTrace(new PrintWriter(sw));
+ return sw.toString();
+ }
+
+ public ProcessStatus getStatus() {
+ return status.get();
+ }
+
+ public void cancel() {
+ if (finishTime() > 0) {
+ return;
+ }
+ status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.CANCELED);
+ future.cancel(true);
+ }
+
+ public long finishTime() {
+ return this.finishTime.get();
+ }
+ }
+}
diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
b/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java
old mode 100644
new mode 100755
similarity index 58%
copy from amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
copy to amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java
index c75c5ac8d..7dc358ad8
--- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java
+++ b/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java
@@ -16,16 +16,25 @@
* limitations under the License.
*/
-package org.apache.amoro;
+package org.apache.amoro.process;
-public class IcebergActions {
+/**
+ * A {@link TableProcess} that can be executed locally by a thread pool.
+ *
+ * <p>Local processes are executed by {@code ExecuteEngine} implementations
which run the logic
+ * inside the AMS JVM.
+ */
+public interface LocalProcess extends AmoroProcess {
- private static final TableFormat[] DEFAULT_FORMATS =
- new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE};
+ /** Execute process logic locally. */
+ void run();
- public static final Action SYSTEM = Action.register("system");
- public static final Action REWRITE = Action.register("rewrite");
- public static final Action DELETE_ORPHANS =
Action.register("delete-orphans");
- public static final Action SYNC_HIVE = Action.register("sync-hive");
- public static final Action EXPIRE_DATA = Action.register("expire-data");
+ /**
+ * Tag used by local execution engines to select a thread pool.
+ *
+ * @return pool tag
+ */
+ default String tag() {
+ return "default";
+ }
}
diff --git
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
b/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine
old mode 100644
new mode 100755
similarity index 93%
rename from
amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
rename to
amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine
index a3b50433a..8859bb6d3
---
a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.table.TableRuntimeFactory
+++
b/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine
@@ -16,4 +16,4 @@
# limitations under the License.
#
-org.apache.amoro.server.table.DefaultTableRuntimeFactory
+org.apache.amoro.process.LocalExecutionEngine
diff --git
a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
new file mode 100644
index 000000000..d6930622d
--- /dev/null
+++
b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.process;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.amoro.Action;
+import org.apache.amoro.TableRuntime;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestLocalExecutionEngine {
+
+ private LocalExecutionEngine engine;
+
+ @After
+ public void tearDown() {
+ if (engine != null) {
+ engine.close();
+ }
+ }
+
+ @Test
+ public void testSubmitUsesCustomPoolByTag() throws Exception {
+ engine = createEngineWithTtl("1h");
+
+ CountDownLatch started = new CountDownLatch(1);
+ AtomicReference<String> threadName = new AtomicReference<>();
+
+ LocalProcessTableProcess process =
+ new LocalProcessTableProcess(
+ mock(TableRuntime.class),
+ engine,
+ "snapshots-expiring",
+ () -> {
+ threadName.set(Thread.currentThread().getName());
+ started.countDown();
+ });
+
+ String identifier = engine.submitTableProcess(process);
+
+ Assert.assertTrue("process should start", started.await(5,
TimeUnit.SECONDS));
+ Assert.assertTrue(
+ "should run in custom pool",
+ threadName.get() != null &&
threadName.get().startsWith("local-snapshots-expiring-"));
+
+ waitForStatus(identifier, ProcessStatus.SUCCESS, 5000);
+ }
+
+ @Test
+ public void testCancelRunningProcess() throws Exception {
+ engine = createEngineWithTtl("1h");
+
+ CountDownLatch blockLatch = new CountDownLatch(1);
+ LocalProcessTableProcess process =
+ new LocalProcessTableProcess(
+ mock(TableRuntime.class),
+ engine,
+ "default",
+ () -> {
+ try {
+ blockLatch.await();
+ } catch (InterruptedException ignored) {
+ // ignore
+ }
+ });
+
+ String identifier = engine.submitTableProcess(process);
+
+ // Process may not be scheduled yet, but holder is already created.
+ Assert.assertEquals(ProcessStatus.RUNNING, engine.getStatus(identifier));
+
+ engine.tryCancelTableProcess(process, identifier);
+
+ // Eventually the process should be marked as canceled.
+ waitForStatus(identifier, ProcessStatus.CANCELED, 5000);
+ }
+
+ @Test
+ public void testFinishedStatusExpired() throws Exception {
+ engine = createEngineWithTtl("100ms");
+
+ LocalProcessTableProcess process =
+ new LocalProcessTableProcess(mock(TableRuntime.class), engine,
"default", () -> {});
+
+ String identifier = engine.submitTableProcess(process);
+
+ waitForStatus(identifier, ProcessStatus.SUCCESS, 5000);
+
+ // Wait until process info should be expired.
+ Thread.sleep(200);
+
+ Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus(identifier));
+ }
+
+ @Test
+ public void testFailedProcessStatus() throws Exception {
+ engine = createEngineWithTtl("1h");
+
+ LocalProcessTableProcess process =
+ new LocalProcessTableProcess(
+ mock(TableRuntime.class),
+ engine,
+ "default",
+ () -> {
+ throw new RuntimeException("boom");
+ });
+
+ String identifier = engine.submitTableProcess(process);
+
+ waitForStatus(identifier, ProcessStatus.FAILED, 5000);
+ }
+
+ @Test
+ public void testGetStatusForInvalidIdentifier() {
+ engine = createEngineWithTtl("1h");
+
+ Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus(null));
+ Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus(""));
+ Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus("not-exist"));
+ }
+
+ private LocalExecutionEngine createEngineWithTtl(String ttl) {
+ LocalExecutionEngine localEngine = new LocalExecutionEngine();
+ Map<String, String> properties = new HashMap<>();
+ properties.put("pool.default.thread-count", "1");
+ properties.put("pool.snapshots-expiring.thread-count", "1");
+ properties.put("process.status.ttl", ttl);
+ localEngine.open(properties);
+ return localEngine;
+ }
+
+ private void waitForStatus(String identifier, ProcessStatus expected, long
timeoutMillis)
+ throws InterruptedException {
+ long deadline = System.currentTimeMillis() + timeoutMillis;
+ while (System.currentTimeMillis() < deadline) {
+ ProcessStatus status = engine.getStatus(identifier);
+ if (status == expected) {
+ return;
+ }
+ Thread.sleep(10);
+ }
+ Assert.fail(
+ "Timeout waiting for status " + expected + ", current=" +
engine.getStatus(identifier));
+ }
+
+ private static class LocalProcessTableProcess extends TableProcess
implements LocalProcess {
+
+ private final String tag;
+ private final Runnable runnable;
+
+ LocalProcessTableProcess(
+ TableRuntime tableRuntime, ExecuteEngine engine, String tag, Runnable
runnable) {
+ super(tableRuntime, engine);
+ this.tag = tag;
+ this.runnable = runnable;
+ }
+
+ @Override
+ public void run() {
+ runnable.run();
+ }
+
+ @Override
+ public String tag() {
+ return tag;
+ }
+
+ @Override
+ public Action getAction() {
+ return Action.register("TEST");
+ }
+
+ @Override
+ public Map<String, String> getProcessParameters() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public Map<String, String> getSummary() {
+ return new HashMap<>();
+ }
+ }
+}
diff --git a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
old mode 100644
new mode 100755
similarity index 81%
copy from dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
copy to dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
index 34cf7c94d..5c3199f32
--- a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml
@@ -15,11 +15,10 @@
# limitations under the License.
#
-# This file is config file for plugin metric reporter
-# configurations of metric reporters
-
-table-runtime-factories:
- - name: default # configs for table runtime factory
+execute-engines:
+ - name: local
enabled: true
priority: 100
-
+ properties:
+ pool.default.thread-count: 10
+ pool.snapshots-expiring.thread-count: 10
diff --git a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
old mode 100644
new mode 100755
similarity index 81%
rename from dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
rename to dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
index 34cf7c94d..e1455c2cf
--- a/dist/src/main/amoro-bin/conf/plugins/table-runtime-factories.yaml
+++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml
@@ -15,11 +15,10 @@
# limitations under the License.
#
-# This file is config file for plugin metric reporter
-# configurations of metric reporters
-
-table-runtime-factories:
- - name: default # configs for table runtime factory
+process-factories:
+ - name: iceberg
enabled: true
priority: 100
-
+ properties:
+ expire-snapshots.enabled: "true"
+ expire-snapshots.interval: "1h"