This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new a2d4b2185 [AMORO-4103] Release table runtime when the group name is
changed to a non-existent resource group (#4104)
a2d4b2185 is described below
commit a2d4b2185c8a40bc66172e9d32d5a6efa6a5dcdc
Author: Xu Bai <[email protected]>
AuthorDate: Mon Mar 9 20:18:27 2026 +0800
[AMORO-4103] Release table runtime when the group name is changed to a
non-existent resource group (#4104)
* Release table runtime when the group name is changed to a non-existent
resource group.
* spotless
* Update optimizing status assertion in TestOptimizingQueue
* Fix handling of optimizing process when resource group changes to
non-existent group
---
.../amoro/server/DefaultOptimizingService.java | 40 +++++++++--
.../amoro/server/TestDefaultOptimizingService.java | 82 ++++++++++++++++++++++
.../server/optimizing/TestOptimizingQueue.java | 78 ++++++++++++++++++++
3 files changed, 195 insertions(+), 5 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 9be63dbaf..28ac43d74 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -163,9 +163,28 @@ public class DefaultOptimizingService extends
StatedPersistentBase
optimizingQueueByGroup.put(groupName, optimizingQueue);
});
optimizers.forEach(optimizer -> registerOptimizer(optimizer, false));
- groupToTableRuntimes
- .keySet()
- .forEach(groupName -> LOG.warn("Unloaded task runtime in group {}",
groupName));
+ // Avoid keeping the tables in processing/pending status forever in below
cases:
+ // 1) Resource group does not exist
+ // 2) The AMS restarts after the tables disable self-optimizing but before
the optimizing
+ // process is closed, which may cause the optimizing status of the tables
to be still
+ // PLANNING/PENDING after AMS is restarted.
+ groupToTableRuntimes.forEach(
+ (groupName, trs) -> {
+ trs.stream()
+ .filter(
+ tr ->
+ tr.getOptimizingStatus() == OptimizingStatus.PLANNING
+ || tr.getOptimizingStatus() ==
OptimizingStatus.PENDING)
+ .forEach(
+ tr -> {
+ LOG.warn(
+ "Release {} optimizing process for table {}, since its
resource group {} does not exist",
+ tr.getOptimizingStatus().name(),
+ tr.getTableIdentifier(),
+ groupName);
+ tr.completeEmptyProcess();
+ });
+ });
}
private void registerOptimizer(OptimizerInstance optimizer, boolean
needPersistent) {
@@ -379,11 +398,22 @@ public class DefaultOptimizingService extends
StatedPersistentBase
public void handleConfigChanged(TableRuntime runtime, TableConfiguration
originalConfig) {
DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
String originalGroup =
originalConfig.getOptimizingConfig().getOptimizerGroup();
+ Optional<OptimizingQueue> newQueue =
getOptionalQueueByGroup(tableRuntime.getGroupName());
if (!tableRuntime.getGroupName().equals(originalGroup)) {
getOptionalQueueByGroup(originalGroup).ifPresent(q ->
q.releaseTable(tableRuntime));
+ // If the new group doesn't exist, close the process to avoid the
table in limbo(PENDING)
+ // status.
+ if (newQueue.isEmpty()) {
+ LOG.warn(
+ "Cannot find the resource group: {}, try to release optimizing
process of table {} directly",
+ tableRuntime.getGroupName(),
+ tableRuntime.getTableIdentifier());
+ tableRuntime.completeEmptyProcess();
+ }
}
- getOptionalQueueByGroup(tableRuntime.getGroupName())
- .ifPresent(q -> q.refreshTable(tableRuntime));
+
+ // Binding new queue if the new group exists
+ newQueue.ifPresent(q -> q.refreshTable(tableRuntime));
}
@Override
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 2116631de..417150b39 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -18,9 +18,14 @@
package org.apache.amoro.server;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.api.OptimizerRegisterInfo;
import org.apache.amoro.api.OptimizingTask;
@@ -28,12 +33,15 @@ import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.api.OptimizingTaskResult;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.PluginRetryAuthException;
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.TableOptimizing;
import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
@@ -45,6 +53,7 @@ import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor;
import org.apache.amoro.server.table.AMSTableTestBase;
import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.table.MixedTable;
@@ -595,6 +604,79 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
assertTaskStatus(TaskRuntime.Status.PLANNED);
}
+ /**
+ * Test handleConfigChanged when the optimizer group changes to a different
existing group. The
+ * table should be released from the old group's queue and from the new
group's queue.
+ */
+ @Test
+ public void testHandleConfigChangedGroupChanged() {
+ // Create a new resource group
+ ResourceGroup newGroup = new ResourceGroup.Builder("test-new-group",
"local").build();
+ try {
+ optimizerManager().createResourceGroup(newGroup);
+ } catch (Throwable ignored) {
+ }
+ optimizingService().createResourceGroup(newGroup);
+
+ try {
+ TableRuntime tableRuntime =
tableService().getRuntime(serverTableIdentifier().getId());
+ String originalGroup = tableRuntime.getGroupName();
+
+ // Build original config with the old group name
+ OptimizingConfig originalOptConfig = new OptimizingConfig();
+ originalOptConfig.setOptimizerGroup(originalGroup);
+ TableConfiguration originalConfig = new TableConfiguration();
+ originalConfig.setOptimizingConfig(originalOptConfig);
+
+ // Simulate that the table now belongs to the new group
+ TableRuntime spyRuntime = spy(tableRuntime);
+ doReturn("test-new-group").when(spyRuntime).getGroupName();
+ doReturn(TableFormat.ICEBERG).when(spyRuntime).getFormat();
+
+ // Fire config changed (group changed from "default" to "test-new-group")
+ RuntimeHandlerChain handler =
optimizingService().getTableRuntimeHandler();
+ handler.fireConfigChanged(spyRuntime, originalConfig);
+
+ // No exception should be thrown; table should be released from both old
and new queue
+ } finally {
+ optimizingService().deleteResourceGroup("test-new-group");
+ try {
+ optimizerManager().deleteResourceGroup("test-new-group");
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+
+ /**
+ * Test handleConfigChanged when the new optimizer group does not exist. The
table runtime's
+ * completeEmptyProcess() should be called.
+ */
+ @Test
+ public void testHandleConfigChangedGroupNotExist() {
+ DefaultTableRuntime tableRuntime =
+ (DefaultTableRuntime)
tableService().getRuntime(serverTableIdentifier().getId());
+ String originalGroup = tableRuntime.getGroupName();
+
+ // Build original config with the original group
+ OptimizingConfig originalOptConfig = new OptimizingConfig();
+ originalOptConfig.setOptimizerGroup(originalGroup);
+ TableConfiguration originalConfig = new TableConfiguration();
+ originalConfig.setOptimizingConfig(originalOptConfig);
+
+ // Simulate that the table now belongs to a non-existing group
+ DefaultTableRuntime spyRuntime = spy(tableRuntime);
+ doReturn("non-existing-group").when(spyRuntime).getGroupName();
+ doReturn(TableFormat.ICEBERG).when(spyRuntime).getFormat();
+ doReturn(serverTableIdentifier()).when(spyRuntime).getTableIdentifier();
+
+ // Fire config changed (group changed from "default" to
"non-existing-group")
+ RuntimeHandlerChain handler = optimizingService().getTableRuntimeHandler();
+ handler.fireConfigChanged(spyRuntime, originalConfig);
+
+ // Verify that completeEmptyProcess was called on the spy
+ verify(spyRuntime).completeEmptyProcess();
+ }
+
private OptimizerRegisterInfo buildRegisterInfo() {
OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo();
Map<String, String> registerProperties = Maps.newHashMap();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index bf26651a3..85316660b 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -74,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class TestOptimizingQueue extends AMSTableTestBase {
@@ -610,6 +611,48 @@ public class TestOptimizingQueue extends AMSTableTestBase {
queue2.dispose();
}
+ @Test
+ public void testReleaseOrphanedPlanningTableOnRestart() {
+ // Scenario: Table config was changed (optimizer group: "old_group" ->
"default"),
+ // the optimizing process was closed but table runtime is persisted with
"old_group" and
+ // PLANNING status
+ ResourceGroup oldGroup = new ResourceGroup.Builder("old_group",
"local").build();
+ // reset optimizer group to "default" to simulate the scenario where the
self-optimizing configs
+ // have been cleared
+ DefaultTableRuntime tableRuntime =
+ buildTableRuntimeMeta(OptimizingStatus.PLANNING,
defaultResourceGroup());
+ Assert.assertEquals(OptimizingStatus.PLANNING,
tableRuntime.getOptimizingStatus());
+ Assert.assertEquals("default", tableRuntime.getGroupName());
+
+ List<DefaultTableRuntime> released =
+ simulateLoadOptimizingQueuesForNonExistentGroup(
+ Collections.singletonList(tableRuntime), oldGroup);
+
+ Assert.assertEquals(1, released.size());
+ Assert.assertEquals(OptimizingStatus.IDLE,
tableRuntime.getOptimizingStatus());
+ }
+
+ @Test
+ public void testReleaseOrphanedPendingTableOnRestart() {
+ // Scenario: Table config was changed (optimizer group: "old_group" ->
"default"),
+ // the optimizing process was closed but table runtime is persisted with
"default" and PENDING
+ // status
+ ResourceGroup oldGroup = new ResourceGroup.Builder("old_group",
"local").build();
+ // reset optimizer group to "default" to simulate the scenario where the
self-optimizing configs
+ // have been cleared
+ DefaultTableRuntime tableRuntime =
+ buildTableRuntimeMeta(OptimizingStatus.PENDING,
defaultResourceGroup());
+ Assert.assertEquals(OptimizingStatus.PENDING,
tableRuntime.getOptimizingStatus());
+ Assert.assertEquals("default", tableRuntime.getGroupName());
+
+ List<DefaultTableRuntime> released =
+ simulateLoadOptimizingQueuesForNonExistentGroup(
+ Collections.singletonList(tableRuntime), oldGroup);
+
+ Assert.assertEquals(1, released.size());
+ Assert.assertEquals(OptimizingStatus.IDLE,
tableRuntime.getOptimizingStatus());
+ }
+
protected DefaultTableRuntime initTableWithFiles() {
MixedTable mixedTable =
(MixedTable)
tableService().loadTable(serverTableIdentifier()).originalTable();
@@ -719,6 +762,41 @@ public class TestOptimizingQueue extends AMSTableTestBase {
return optimizingTaskResult;
}
+ /**
+ * Simulate the loadOptimizingQueues logic: tables whose persisted optimizer
group no longer
+ * exists (e.g., table config changed from an old deleted group to
"default", but AMS restarted
+ * before the optimizing process was closed) remain in the leftover
groupToTableRuntimes map.
+ * These PLANNING/PENDING tables should be released to IDLE via
completeEmptyProcess().
+ */
+ private List<DefaultTableRuntime>
simulateLoadOptimizingQueuesForNonExistentGroup(
+ List<DefaultTableRuntime> tableRuntimes, ResourceGroup resourceGroup) {
+ // Only the created resource group is returned
+ List<ResourceGroup> existingGroups =
Collections.singletonList(resourceGroup);
+
+ // Group tables by their persisted optimizer group
+ Map<String, List<DefaultTableRuntime>> groupToTableRuntimes =
+
tableRuntimes.stream().collect(Collectors.groupingBy(DefaultTableRuntime::getGroupName));
+
+ // Remove groups that exist — same logic as loadOptimizingQueues
+ existingGroups.forEach(group ->
groupToTableRuntimes.remove(group.getName()));
+
+ // Release PLANNING/PENDING tables in non-existent groups — same logic as
loadOptimizingQueues
+ List<DefaultTableRuntime> released = new ArrayList<>();
+ groupToTableRuntimes.forEach(
+ (groupName, trs) ->
+ trs.stream()
+ .filter(
+ tr ->
+ tr.getOptimizingStatus() == OptimizingStatus.PLANNING
+ || tr.getOptimizingStatus() ==
OptimizingStatus.PENDING)
+ .forEach(
+ tr -> {
+ tr.completeEmptyProcess();
+ released.add(tr);
+ }));
+ return released;
+ }
+
private OptimizingTaskResult buildOptimizingTaskFailed(OptimizingTaskId
taskId, int threadId) {
OptimizingTaskResult optimizingTaskResult = new
OptimizingTaskResult(taskId, threadId);
optimizingTaskResult.setErrorMessage("error");