This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new a2d4b2185 [AMORO-4103] Release table runtime when the group name is 
changed to a non-existent resource group (#4104)
a2d4b2185 is described below

commit a2d4b2185c8a40bc66172e9d32d5a6efa6a5dcdc
Author: Xu Bai <[email protected]>
AuthorDate: Mon Mar 9 20:18:27 2026 +0800

    [AMORO-4103] Release table runtime when the group name is changed to a 
non-existent resource group (#4104)
    
    * Release table runtime when the group name is changed to a non-existent 
resource group.
    
    * spotless
    
    * Update optimizing status assertion in TestOptimizingQueue
    
    * Fix handling of optimizing process when resource group changes to 
non-existent group
---
 .../amoro/server/DefaultOptimizingService.java     | 40 +++++++++--
 .../amoro/server/TestDefaultOptimizingService.java | 82 ++++++++++++++++++++++
 .../server/optimizing/TestOptimizingQueue.java     | 78 ++++++++++++++++++++
 3 files changed, 195 insertions(+), 5 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 9be63dbaf..28ac43d74 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -163,9 +163,28 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
           optimizingQueueByGroup.put(groupName, optimizingQueue);
         });
     optimizers.forEach(optimizer -> registerOptimizer(optimizer, false));
-    groupToTableRuntimes
-        .keySet()
-        .forEach(groupName -> LOG.warn("Unloaded task runtime in group {}", 
groupName));
+    // Avoid keeping the tables in processing/pending status forever in below 
cases:
+    // 1) Resource group does not exist
+    // 2) The AMS restarts after the tables disable self-optimizing but before 
the optimizing
+    // process is closed, which may cause the optimizing status of the tables 
to be still
+    // PLANNING/PENDING after AMS is restarted.
+    groupToTableRuntimes.forEach(
+        (groupName, trs) -> {
+          trs.stream()
+              .filter(
+                  tr ->
+                      tr.getOptimizingStatus() == OptimizingStatus.PLANNING
+                          || tr.getOptimizingStatus() == 
OptimizingStatus.PENDING)
+              .forEach(
+                  tr -> {
+                    LOG.warn(
+                        "Release {} optimizing process for table {}, since its 
resource group {} does not exist",
+                        tr.getOptimizingStatus().name(),
+                        tr.getTableIdentifier(),
+                        groupName);
+                    tr.completeEmptyProcess();
+                  });
+        });
   }
 
   private void registerOptimizer(OptimizerInstance optimizer, boolean 
needPersistent) {
@@ -379,11 +398,22 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     public void handleConfigChanged(TableRuntime runtime, TableConfiguration 
originalConfig) {
       DefaultTableRuntime tableRuntime = (DefaultTableRuntime) runtime;
       String originalGroup = 
originalConfig.getOptimizingConfig().getOptimizerGroup();
+      Optional<OptimizingQueue> newQueue = 
getOptionalQueueByGroup(tableRuntime.getGroupName());
       if (!tableRuntime.getGroupName().equals(originalGroup)) {
         getOptionalQueueByGroup(originalGroup).ifPresent(q -> 
q.releaseTable(tableRuntime));
+        // If the new group doesn't exist, close the process to avoid the 
table in limbo(PENDING)
+        // status.
+        if (newQueue.isEmpty()) {
+          LOG.warn(
+              "Cannot find the resource group: {}, try to release optimizing 
process of table {} directly",
+              tableRuntime.getGroupName(),
+              tableRuntime.getTableIdentifier());
+          tableRuntime.completeEmptyProcess();
+        }
       }
-      getOptionalQueueByGroup(tableRuntime.getGroupName())
-          .ifPresent(q -> q.refreshTable(tableRuntime));
+
+      // Binding new queue if the new group exists
+      newQueue.ifPresent(q -> q.refreshTable(tableRuntime));
     }
 
     @Override
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 2116631de..417150b39 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -18,9 +18,14 @@
 
 package org.apache.amoro.server;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 import org.apache.amoro.BasicTableTestHelper;
 import org.apache.amoro.OptimizerProperties;
 import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableRuntime;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.api.OptimizerRegisterInfo;
 import org.apache.amoro.api.OptimizingTask;
@@ -28,12 +33,15 @@ import org.apache.amoro.api.OptimizingTaskId;
 import org.apache.amoro.api.OptimizingTaskResult;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
 import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.config.OptimizingConfig;
+import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.exception.IllegalTaskStateException;
 import org.apache.amoro.exception.PluginRetryAuthException;
 import org.apache.amoro.io.MixedDataTestHelpers;
 import org.apache.amoro.optimizing.RewriteFilesOutput;
 import org.apache.amoro.optimizing.TableOptimizing;
 import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.resource.ResourceGroup;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.optimizing.TaskRuntime;
 import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
@@ -45,6 +53,7 @@ import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor;
 import org.apache.amoro.server.table.AMSTableTestBase;
 import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.RuntimeHandlerChain;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.table.MixedTable;
@@ -595,6 +604,79 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     assertTaskStatus(TaskRuntime.Status.PLANNED);
   }
 
+  /**
+   * Test handleConfigChanged when the optimizer group changes to a different 
existing group. The
+   * table should be released from the old group's queue and from the new 
group's queue.
+   */
+  @Test
+  public void testHandleConfigChangedGroupChanged() {
+    // Create a new resource group
+    ResourceGroup newGroup = new ResourceGroup.Builder("test-new-group", 
"local").build();
+    try {
+      optimizerManager().createResourceGroup(newGroup);
+    } catch (Throwable ignored) {
+    }
+    optimizingService().createResourceGroup(newGroup);
+
+    try {
+      TableRuntime tableRuntime = 
tableService().getRuntime(serverTableIdentifier().getId());
+      String originalGroup = tableRuntime.getGroupName();
+
+      // Build original config with the old group name
+      OptimizingConfig originalOptConfig = new OptimizingConfig();
+      originalOptConfig.setOptimizerGroup(originalGroup);
+      TableConfiguration originalConfig = new TableConfiguration();
+      originalConfig.setOptimizingConfig(originalOptConfig);
+
+      // Simulate that the table now belongs to the new group
+      TableRuntime spyRuntime = spy(tableRuntime);
+      doReturn("test-new-group").when(spyRuntime).getGroupName();
+      doReturn(TableFormat.ICEBERG).when(spyRuntime).getFormat();
+
+      // Fire config changed (group changed from "default" to "test-new-group")
+      RuntimeHandlerChain handler = 
optimizingService().getTableRuntimeHandler();
+      handler.fireConfigChanged(spyRuntime, originalConfig);
+
+      // No exception should be thrown; table should be released from both old 
and new queue
+    } finally {
+      optimizingService().deleteResourceGroup("test-new-group");
+      try {
+        optimizerManager().deleteResourceGroup("test-new-group");
+      } catch (Throwable ignored) {
+      }
+    }
+  }
+
+  /**
+   * Test handleConfigChanged when the new optimizer group does not exist. The 
table runtime's
+   * completeEmptyProcess() should be called.
+   */
+  @Test
+  public void testHandleConfigChangedGroupNotExist() {
+    DefaultTableRuntime tableRuntime =
+        (DefaultTableRuntime) 
tableService().getRuntime(serverTableIdentifier().getId());
+    String originalGroup = tableRuntime.getGroupName();
+
+    // Build original config with the original group
+    OptimizingConfig originalOptConfig = new OptimizingConfig();
+    originalOptConfig.setOptimizerGroup(originalGroup);
+    TableConfiguration originalConfig = new TableConfiguration();
+    originalConfig.setOptimizingConfig(originalOptConfig);
+
+    // Simulate that the table now belongs to a non-existing group
+    DefaultTableRuntime spyRuntime = spy(tableRuntime);
+    doReturn("non-existing-group").when(spyRuntime).getGroupName();
+    doReturn(TableFormat.ICEBERG).when(spyRuntime).getFormat();
+    doReturn(serverTableIdentifier()).when(spyRuntime).getTableIdentifier();
+
+    // Fire config changed (group changed from "default" to 
"non-existing-group")
+    RuntimeHandlerChain handler = optimizingService().getTableRuntimeHandler();
+    handler.fireConfigChanged(spyRuntime, originalConfig);
+
+    // Verify that completeEmptyProcess was called on the spy
+    verify(spyRuntime).completeEmptyProcess();
+  }
+
   private OptimizerRegisterInfo buildRegisterInfo() {
     OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo();
     Map<String, String> registerProperties = Maps.newHashMap();
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index bf26651a3..85316660b 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -74,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 @RunWith(Parameterized.class)
 public class TestOptimizingQueue extends AMSTableTestBase {
@@ -610,6 +611,48 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     queue2.dispose();
   }
 
+  @Test
+  public void testReleaseOrphanedPlanningTableOnRestart() {
+    // Scenario: Table config was changed (optimizer group: "old_group" -> 
"default"),
+    // the optimizing process was closed but table runtime is persisted with 
"old_group" and
+    // PLANNING status
+    ResourceGroup oldGroup = new ResourceGroup.Builder("old_group", 
"local").build();
+    // reset optimizer group to "default" to simulate the scenario where the 
self-optimizing configs
+    // have been cleared
+    DefaultTableRuntime tableRuntime =
+        buildTableRuntimeMeta(OptimizingStatus.PLANNING, 
defaultResourceGroup());
+    Assert.assertEquals(OptimizingStatus.PLANNING, 
tableRuntime.getOptimizingStatus());
+    Assert.assertEquals("default", tableRuntime.getGroupName());
+
+    List<DefaultTableRuntime> released =
+        simulateLoadOptimizingQueuesForNonExistentGroup(
+            Collections.singletonList(tableRuntime), oldGroup);
+
+    Assert.assertEquals(1, released.size());
+    Assert.assertEquals(OptimizingStatus.IDLE, 
tableRuntime.getOptimizingStatus());
+  }
+
+  @Test
+  public void testReleaseOrphanedPendingTableOnRestart() {
+    // Scenario: Table config was changed (optimizer group: "old_group" -> 
"default"),
+    // the optimizing process was closed but table runtime is persisted with 
"default" and PENDING
+    // status
+    ResourceGroup oldGroup = new ResourceGroup.Builder("old_group", 
"local").build();
+    // reset optimizer group to "default" to simulate the scenario where the 
self-optimizing configs
+    // have been cleared
+    DefaultTableRuntime tableRuntime =
+        buildTableRuntimeMeta(OptimizingStatus.PENDING, 
defaultResourceGroup());
+    Assert.assertEquals(OptimizingStatus.PENDING, 
tableRuntime.getOptimizingStatus());
+    Assert.assertEquals("default", tableRuntime.getGroupName());
+
+    List<DefaultTableRuntime> released =
+        simulateLoadOptimizingQueuesForNonExistentGroup(
+            Collections.singletonList(tableRuntime), oldGroup);
+
+    Assert.assertEquals(1, released.size());
+    Assert.assertEquals(OptimizingStatus.IDLE, 
tableRuntime.getOptimizingStatus());
+  }
+
   protected DefaultTableRuntime initTableWithFiles() {
     MixedTable mixedTable =
         (MixedTable) 
tableService().loadTable(serverTableIdentifier()).originalTable();
@@ -719,6 +762,41 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     return optimizingTaskResult;
   }
 
+  /**
+   * Simulate the loadOptimizingQueues logic: tables whose persisted optimizer 
group no longer
+   * exists (e.g., table config changed from an old deleted group to 
"default", but AMS restarted
+   * before the optimizing process was closed) remain in the leftover 
groupToTableRuntimes map.
+   * These PLANNING/PENDING tables should be released to IDLE via 
completeEmptyProcess().
+   */
+  private List<DefaultTableRuntime> 
simulateLoadOptimizingQueuesForNonExistentGroup(
+      List<DefaultTableRuntime> tableRuntimes, ResourceGroup resourceGroup) {
+    // Only the created resource group is returned
+    List<ResourceGroup> existingGroups = 
Collections.singletonList(resourceGroup);
+
+    // Group tables by their persisted optimizer group
+    Map<String, List<DefaultTableRuntime>> groupToTableRuntimes =
+        
tableRuntimes.stream().collect(Collectors.groupingBy(DefaultTableRuntime::getGroupName));
+
+    // Remove groups that exist — same logic as loadOptimizingQueues
+    existingGroups.forEach(group -> 
groupToTableRuntimes.remove(group.getName()));
+
+    // Release PLANNING/PENDING tables in non-existent groups — same logic as 
loadOptimizingQueues
+    List<DefaultTableRuntime> released = new ArrayList<>();
+    groupToTableRuntimes.forEach(
+        (groupName, trs) ->
+            trs.stream()
+                .filter(
+                    tr ->
+                        tr.getOptimizingStatus() == OptimizingStatus.PLANNING
+                            || tr.getOptimizingStatus() == 
OptimizingStatus.PENDING)
+                .forEach(
+                    tr -> {
+                      tr.completeEmptyProcess();
+                      released.add(tr);
+                    }));
+    return released;
+  }
+
   private OptimizingTaskResult buildOptimizingTaskFailed(OptimizingTaskId 
taskId, int threadId) {
     OptimizingTaskResult optimizingTaskResult = new 
OptimizingTaskResult(taskId, threadId);
     optimizingTaskResult.setErrorMessage("error");

Reply via email to