This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 5775d1942 [AMORO-4086] Add OptimizerGroupKeeper to automatically 
maintain optim… (#4087)
5775d1942 is described below

commit 5775d194270bca31f4e3c20d68adf6c9edcf44d3
Author: davedwwang <[email protected]>
AuthorDate: Tue Mar 10 11:12:54 2026 +0800

    [AMORO-4086] Add OptimizerGroupKeeper to automatically maintain optim… 
(#4087)
    
    [AMORO-4086] Add OptimizerGroupKeeper to automatically maintain optimizer 
resources based on min-parallelism configuration
    
    Co-authored-by: davedwwang <[email protected]>
---
 .../apache/amoro/server/AmoroManagementConf.java   |  17 +
 .../amoro/server/DefaultOptimizingService.java     | 240 +++++++++++--
 .../apache/amoro/server/AMSServiceTestBase.java    |   3 +
 .../amoro/server/TestOptimizerGroupKeeper.java     | 377 +++++++++++++++++++++
 .../java/org/apache/amoro/OptimizerProperties.java |   1 +
 docs/admin-guides/managing-optimizers.md           |   1 +
 docs/configuration/ams-config.md                   |   2 +
 7 files changed, 610 insertions(+), 31 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 96e54b7df..e50e18a90 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -514,6 +514,23 @@ public class AmoroManagementConf {
           .defaultValue(Duration.ofSeconds(3))
           .withDescription("Optimizer polling task timeout.");
 
+  public static final ConfigOption<Duration> 
OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL =
+      ConfigOptions.key("optimizer-group.min-parallelism-check-interval")
+          .durationType()
+          .defaultValue(Duration.ofMinutes(5))
+          .withDescription(
+              "The interval for checking and ensuring the optimizer group 
meets its minimum parallelism requirement. "
+                  + "When the current parallelism falls below the configured 
min-parallelism, "
+                  + "the system will attempt to scale out optimizers at this 
interval. "
+                  + "The actual scale-out timing is calculated as: consecutive 
keeping attempts * this interval.");
+
+  public static final ConfigOption<Integer> 
OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS =
+      ConfigOptions.key("optimizer-group.max-keeping-attempts")
+          .intType()
+          .defaultValue(3)
+          .withDescription(
+              "The maximum number of consecutive attempts to keep the 
optimizer group at its current parallelism.");
+
   public static final ConfigOption<Duration> OPTIMIZING_REFRESH_GROUP_INTERVAL 
=
       ConfigOptions.key("self-optimizing.refresh-group-interval")
           .durationType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 28ac43d74..73ee0dda8 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -32,8 +32,13 @@ import org.apache.amoro.exception.ForbiddenException;
 import org.apache.amoro.exception.IllegalTaskStateException;
 import org.apache.amoro.exception.ObjectNotExistsException;
 import org.apache.amoro.exception.PluginRetryAuthException;
+import org.apache.amoro.resource.Resource;
+import org.apache.amoro.resource.ResourceContainer;
 import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.resource.ResourceType;
 import org.apache.amoro.server.catalog.CatalogManager;
+import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
+import org.apache.amoro.server.manager.AbstractOptimizerContainer;
 import org.apache.amoro.server.optimizing.OptimizingProcess;
 import org.apache.amoro.server.optimizing.OptimizingQueue;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
@@ -43,6 +48,7 @@ import 
org.apache.amoro.server.persistence.mapper.OptimizerMapper;
 import org.apache.amoro.server.persistence.mapper.ResourceMapper;
 import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
 import org.apache.amoro.server.process.TableProcessMeta;
+import org.apache.amoro.server.resource.Containers;
 import org.apache.amoro.server.resource.OptimizerInstance;
 import org.apache.amoro.server.resource.OptimizerManager;
 import org.apache.amoro.server.resource.OptimizerThread;
@@ -89,6 +95,8 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultOptimizingService.class);
 
+  private final long groupMinParallelismCheckInterval;
+  private final int groupMaxKeepingAttempts;
   private final long optimizerTouchTimeout;
   private final long taskAckTimeout;
   private final long taskExecuteTimeout;
@@ -99,7 +107,9 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   private final Map<String, OptimizingQueue> optimizingQueueByGroup = new 
ConcurrentHashMap<>();
   private final Map<String, OptimizingQueue> optimizingQueueByToken = new 
ConcurrentHashMap<>();
   private final Map<String, OptimizerInstance> authOptimizers = new 
ConcurrentHashMap<>();
-  private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper();
+  private final OptimizerKeeper optimizerKeeper = new 
OptimizerKeeper("optimizer-keeper-thread");
+  private final OptimizerGroupKeeper optimizerGroupKeeper =
+      new OptimizerGroupKeeper("optimizer-group-keeper-thread");
   private final OptimizingConfigWatcher optimizingConfigWatcher = new 
OptimizingConfigWatcher();
   private final CatalogManager catalogManager;
   private final OptimizerManager optimizerManager;
@@ -126,6 +136,11 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
         
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
     this.breakQuotaLimit =
         
serviceConfig.getBoolean(AmoroManagementConf.OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED);
+    this.groupMinParallelismCheckInterval =
+        serviceConfig.getDurationInMillis(
+            
AmoroManagementConf.OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL);
+    this.groupMaxKeepingAttempts =
+        
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS);
     this.tableService = tableService;
     this.catalogManager = catalogManager;
     this.optimizerManager = optimizerManager;
@@ -161,6 +176,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
                   Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new),
                   maxPlanningParallelism);
           optimizingQueueByGroup.put(groupName, optimizingQueue);
+          optimizerGroupKeeper.keepInTouch(groupName, 1);
         });
     optimizers.forEach(optimizer -> registerOptimizer(optimizer, false));
     // Avoid keeping the tables in processing/pending status forever in below 
cases:
@@ -348,7 +364,9 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
                   planExecutor,
                   new ArrayList<>(),
                   maxPlanningParallelism);
-          optimizingQueueByGroup.put(resourceGroup.getName(), optimizingQueue);
+          String groupName = resourceGroup.getName();
+          optimizingQueueByGroup.put(groupName, optimizingQueue);
+          optimizerGroupKeeper.keepInTouch(groupName, 1);
         });
   }
 
@@ -369,6 +387,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     // dispose all queues
     optimizingQueueByGroup.values().forEach(OptimizingQueue::dispose);
     optimizerKeeper.dispose();
+    optimizerGroupKeeper.dispose();
     tableHandlerChain.dispose();
     optimizingQueueByGroup.clear();
     optimizingQueueByToken.clear();
@@ -439,6 +458,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
               .map(t -> (DefaultTableRuntime) t)
               .collect(Collectors.toList()));
       optimizerKeeper.start();
+      optimizerGroupKeeper.start();
       optimizingConfigWatcher.start();
       LOG.info("SuspendingDetector for Optimizer has been started.");
       LOG.info("OptimizerManagementService initializing has completed");
@@ -489,21 +509,16 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     }
   }
 
-  private class OptimizerKeeper implements Runnable {
-
-    private volatile boolean stopped = false;
-    private final Thread thread = new Thread(this, "optimizer-keeper-thread");
-    private final DelayQueue<OptimizerKeepingTask> suspendingQueue = new 
DelayQueue<>();
+  protected abstract class AbstractKeeper<T extends Delayed> implements 
Runnable {
+    protected volatile boolean stopped = false;
+    protected final Thread thread = new Thread(this);
+    protected final DelayQueue<T> suspendingQueue = new DelayQueue<>();
 
-    public OptimizerKeeper() {
+    public AbstractKeeper(String threadName) {
+      thread.setName(threadName);
       thread.setDaemon(true);
     }
 
-    public void keepInTouch(OptimizerInstance optimizerInstance) {
-      Preconditions.checkNotNull(optimizerInstance, "token can not be null");
-      suspendingQueue.add(new OptimizerKeepingTask(optimizerInstance));
-    }
-
     public void start() {
       thread.start();
     }
@@ -517,30 +532,49 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     public void run() {
       while (!stopped) {
         try {
-          OptimizerKeepingTask keepingTask = suspendingQueue.take();
-          String token = keepingTask.getToken();
-          boolean isExpired = !keepingTask.tryKeeping();
-          if (isExpired) {
-            LOG.info("Optimizer {} has been expired, unregister it", 
keepingTask.getOptimizer());
-            unregisterOptimizer(token);
-          }
-          Optional.ofNullable(keepingTask.getQueue())
-              .ifPresent(
-                  queue ->
-                      queue
-                          
.collectTasks(buildSuspendingPredication(authOptimizers.keySet()))
-                          .forEach(task -> retryTask(task, queue)));
-          if (!isExpired) {
-            LOG.debug("Optimizer {} is being touched, keep it", 
keepingTask.getOptimizer());
-            keepInTouch(keepingTask.getOptimizer());
-          }
+          T keepingTask = suspendingQueue.take();
+          this.processTask(keepingTask);
         } catch (InterruptedException ignored) {
         } catch (Throwable t) {
-          LOG.error("OptimizerKeeper has encountered a problem.", t);
+          LOG.error("{} has encountered a problem.", 
this.getClass().getSimpleName(), t);
         }
       }
     }
 
+    protected abstract void processTask(T task) throws Exception;
+  }
+
+  private class OptimizerKeeper extends AbstractKeeper<OptimizerKeepingTask> {
+
+    public OptimizerKeeper(String threadName) {
+      super(threadName);
+    }
+
+    public void keepInTouch(OptimizerInstance optimizerInstance) {
+      Preconditions.checkNotNull(optimizerInstance, "token can not be null");
+      suspendingQueue.add(new OptimizerKeepingTask(optimizerInstance));
+    }
+
+    @Override
+    protected void processTask(OptimizerKeepingTask keepingTask) {
+      String token = keepingTask.getToken();
+      boolean isExpired = !keepingTask.tryKeeping();
+      if (isExpired) {
+        LOG.info("Optimizer {} has been expired, unregister it", 
keepingTask.getOptimizer());
+        unregisterOptimizer(token);
+      }
+      Optional.ofNullable(keepingTask.getQueue())
+          .ifPresent(
+              queue ->
+                  queue
+                      
.collectTasks(buildSuspendingPredication(authOptimizers.keySet()))
+                      .forEach(task -> retryTask(task, queue)));
+      if (!isExpired) {
+        LOG.debug("Optimizer {} is being touched, keep it", 
keepingTask.getOptimizer());
+        keepInTouch(keepingTask.getOptimizer());
+      }
+    }
+
     private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
       if (isTaskExecTimeout(task)) {
         LOG.warn(
@@ -629,4 +663,148 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
       scheduler.shutdown();
     }
   }
+
+  private class OptimizerGroupKeepingTask implements Delayed {
+
+    private final String groupName;
+    private final long lastCheckTime;
+    private final int attempts;
+
+    public OptimizerGroupKeepingTask(String groupName, int attempts) {
+      this.groupName = groupName;
+      this.lastCheckTime = System.currentTimeMillis();
+      this.attempts = attempts;
+    }
+
+    @Override
+    public long getDelay(@NotNull TimeUnit unit) {
+      return unit.convert(
+          lastCheckTime + groupMinParallelismCheckInterval * attempts - 
System.currentTimeMillis(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(@NotNull Delayed o) {
+      OptimizerGroupKeepingTask another = (OptimizerGroupKeepingTask) o;
+      return Long.compare(lastCheckTime, another.lastCheckTime);
+    }
+
+    public int getMinParallelism(ResourceGroup resourceGroup) {
+      if (!resourceGroup
+          .getProperties()
+          .containsKey(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM)) {
+        return 0;
+      }
+      String minParallelism =
+          
resourceGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM);
+      try {
+        return Integer.parseInt(minParallelism);
+      } catch (Throwable t) {
+        LOG.warn("Illegal minParallelism : {}, will use default value 0", 
minParallelism, t);
+        return 0;
+      }
+    }
+
+    public int tryKeeping(ResourceGroup resourceGroup) {
+      List<OptimizerInstance> optimizers = 
optimizerManager.listOptimizers(groupName);
+      OptimizerResourceInfo optimizerResourceInfo = new 
OptimizerResourceInfo();
+      optimizers.forEach(
+          e -> {
+            optimizerResourceInfo.addOccupationCore(e.getThreadCount());
+            optimizerResourceInfo.addOccupationMemory(e.getMemoryMb());
+          });
+      return getMinParallelism(resourceGroup) - 
optimizerResourceInfo.getOccupationCore();
+    }
+
+    public ResourceGroup getResourceGroup() {
+      OptimizingQueue optimizingQueue = optimizingQueueByGroup.get(groupName);
+      if (optimizingQueue == null) {
+        return null;
+      }
+      return optimizingQueue.getOptimizerGroup();
+    }
+
+    public String getGroupName() {
+      return groupName;
+    }
+
+    public int getAttempts() {
+      return attempts;
+    }
+  }
+
+  /**
+   * Optimizer group keeper thread responsible for monitoring resource group 
status and
+   * automatically maintaining optimizer resources.
+   */
+  private class OptimizerGroupKeeper extends 
AbstractKeeper<OptimizerGroupKeepingTask> {
+
+    public OptimizerGroupKeeper(String threadName) {
+      super(threadName);
+    }
+
+    public void keepInTouch(String groupName, int attempts) {
+      Preconditions.checkNotNull(groupName, "groupName can not be null");
+      Preconditions.checkArgument(attempts > 0, "attempts must be greater than 
0");
+      if (this.stopped) {
+        return;
+      }
+      suspendingQueue.add(new OptimizerGroupKeepingTask(groupName, attempts));
+    }
+
+    @Override
+    protected void processTask(OptimizerGroupKeepingTask keepingTask) {
+      ResourceGroup resourceGroup = keepingTask.getResourceGroup();
+      if (resourceGroup == null) {
+        LOG.warn(
+            "ResourceGroup:{} may have been deleted, stop keeping it", 
keepingTask.getGroupName());
+        return;
+      }
+
+      int requiredCores = keepingTask.tryKeeping(resourceGroup);
+      if (requiredCores <= 0) {
+        LOG.debug(
+            "The Resource Group:{} has sufficient resources, keep it", 
resourceGroup.getName());
+        keepInTouch(resourceGroup.getName(), 1);
+        return;
+      }
+
+      if (keepingTask.getAttempts() > groupMaxKeepingAttempts) {
+        int minParallelism = keepingTask.getMinParallelism(resourceGroup);
+        LOG.warn(
+            "Resource Group:{}, creating optimizer {} times in a row, 
optimizers still below min-parallel:{}, will reset min-parallel to {}",
+            resourceGroup.getName(),
+            keepingTask.getAttempts(),
+            minParallelism,
+            minParallelism - requiredCores);
+        resourceGroup
+            .getProperties()
+            .put(
+                OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM,
+                String.valueOf(minParallelism - requiredCores));
+        updateResourceGroup(resourceGroup);
+        optimizerManager.updateResourceGroup(resourceGroup);
+        keepInTouch(resourceGroup.getName(), 1);
+        return;
+      }
+
+      Resource resource =
+          new Resource.Builder(
+                  resourceGroup.getContainer(), resourceGroup.getName(), 
ResourceType.OPTIMIZER)
+              .setProperties(resourceGroup.getProperties())
+              .setThreadCount(requiredCores)
+              .build();
+      ResourceContainer rc = Containers.get(resource.getContainerName());
+      try {
+        ((AbstractOptimizerContainer) rc).requestResource(resource);
+        optimizerManager.createResource(resource);
+      } finally {
+        keepInTouch(resourceGroup.getName(), keepingTask.getAttempts() + 1);
+      }
+      LOG.info(
+          "Resource Group:{} has insufficient resources, created an optimizer 
with parallelism of {}",
+          resourceGroup.getName(),
+          requiredCores);
+    }
+  }
 }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index f7266ba04..34f8f4685 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -45,6 +45,9 @@ public abstract class AMSServiceTestBase extends 
AMSManagerTestBase {
       configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, 
Duration.ofMillis(800L));
       configurations.set(
           AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, 
Duration.ofMillis(30000L));
+      configurations.set(
+          AmoroManagementConf.OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL,
+          Duration.ofMillis(10L));
       TABLE_SERVICE =
           new DefaultTableService(new Configurations(), CATALOG_MANAGER, 
runtimeFactory);
       OPTIMIZING_SERVICE =
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java
new file mode 100644
index 000000000..72ff58753
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestOptimizerGroupKeeper.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import static 
org.apache.amoro.server.AmoroManagementConf.OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS;
+
+import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.OptimizerProperties;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableTestHelper;
+import org.apache.amoro.api.OptimizerRegisterInfo;
+import org.apache.amoro.catalog.BasicCatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.resource.Resource;
+import org.apache.amoro.resource.ResourceContainer;
+import org.apache.amoro.resource.ResourceGroup;
+import org.apache.amoro.server.manager.AbstractOptimizerContainer;
+import org.apache.amoro.server.resource.ContainerMetadata;
+import org.apache.amoro.server.resource.Containers;
+import org.apache.amoro.server.resource.OptimizerInstance;
+import org.apache.amoro.server.table.AMSTableTestBase;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.iceberg.common.DynFields;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+@RunWith(Parameterized.class)
+public class TestOptimizerGroupKeeper extends AMSTableTestBase {
+
+  private static final String TEST_GROUP_NAME = "test-keeper-group";
+  private static final String MOCK_CONTAINER_NAME = "mock-container";
+
+  // Control flags for mock container behavior
+  private final AtomicBoolean resourceAvailable = new AtomicBoolean(true);
+  private final AtomicInteger scaleOutCallCount = new AtomicInteger(0);
+  // Function to register optimizer (will call authenticate)
+  private Function<OptimizerRegisterInfo, String> optimizerRegistrar;
+  private static boolean originIsInitialized = false;
+  // Track the current test's group name for cleanup
+  private String currentGroupName;
+
+  public TestOptimizerGroupKeeper(
+      CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) {
+    super(catalogTestHelper, tableTestHelper, false);
+  }
+
+  @Parameterized.Parameters(name = "{0}, {1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      {new BasicCatalogTestHelper(TableFormat.ICEBERG), new 
BasicTableTestHelper(true, false)}
+    };
+  }
+
+  @Before
+  public void prepare() throws Exception {
+    optimizerRegistrar = registerInfo -> 
optimizingService().authenticate(registerInfo);
+    setupMockContainer();
+  }
+
+  @After
+  public void clear() {
+    if (currentGroupName == null) {
+      return;
+    }
+    try {
+      // Clean up optimizers
+      optimizerManager()
+          .listOptimizers(currentGroupName)
+          .forEach(
+              optimizer ->
+                  optimizingService()
+                      .deleteOptimizer(optimizer.getGroupName(), 
optimizer.getResourceId()));
+      // Delete resource group from optimizing service first (this will 
dispose and unregister
+      // metrics)
+      try {
+        optimizingService().deleteResourceGroup(currentGroupName);
+      } catch (Exception ignored) {
+      }
+      // Then delete from optimizer manager
+      try {
+        optimizerManager().deleteResourceGroup(currentGroupName);
+      } catch (Exception ignored) {
+      }
+    } catch (Exception e) {
+      // ignore
+    } finally {
+      currentGroupName = null;
+    }
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    if (!originIsInitialized) {
+      DynFields.UnboundField<Boolean> initializedField =
+          DynFields.builder().hiddenImpl(Containers.class, 
"isInitialized").build();
+      initializedField.asStatic().set(false);
+    }
+  }
+
+  /** Setup mock container and inject it into Containers using reflection. */
+  private void setupMockContainer() throws Exception {
+    MockOptimizerContainer mockContainer =
+        new MockOptimizerContainer(resourceAvailable, scaleOutCallCount, 
optimizerRegistrar);
+
+    // Use reflection to set isInitialized to true
+    DynFields.UnboundField<Boolean> initializedField =
+        DynFields.builder().hiddenImpl(Containers.class, 
"isInitialized").build();
+    if (!initializedField.asStatic().get()) {
+      originIsInitialized = false;
+      initializedField.asStatic().set(true);
+    }
+
+    // Use reflection to inject mock container into Containers
+    DynFields.UnboundField<Map<String, Object>> containersField =
+        DynFields.builder().hiddenImpl(Containers.class, 
"globalContainers").build();
+    Map<String, Object> globalContainers = containersField.asStatic().get();
+
+    // Create ContainerWrapper using reflection
+    ContainerMetadata metadata =
+        new ContainerMetadata(MOCK_CONTAINER_NAME, 
MockOptimizerContainer.class.getName());
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(OptimizerProperties.AMS_HOME, "/tmp");
+    properties.put(OptimizerProperties.AMS_OPTIMIZER_URI, 
"thrift://localhost:1261");
+    properties.put("memory", "1024");
+    metadata.setProperties(properties);
+
+    // Create ContainerWrapper with pre-initialized container
+    Class<?> wrapperClass =
+        
Class.forName("org.apache.amoro.server.resource.Containers$ContainerWrapper");
+    // Get the two-parameter constructor: ContainerWrapper(ContainerMetadata, 
ResourceContainer)
+    java.lang.reflect.Constructor<?> constructor =
+        wrapperClass.getDeclaredConstructor(ContainerMetadata.class, 
ResourceContainer.class);
+    constructor.setAccessible(true);
+    Object wrapper = constructor.newInstance(metadata, mockContainer);
+    globalContainers.put(MOCK_CONTAINER_NAME, wrapper);
+  }
+
+  private ResourceGroup buildTestResourceGroup(String groupName, int 
minParallelism) {
+    // Track the group name for cleanup
+    this.currentGroupName = groupName;
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(
+        OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM, 
String.valueOf(minParallelism));
+    properties.put("memory", "1024");
+    return new ResourceGroup.Builder(groupName, MOCK_CONTAINER_NAME)
+        .addProperties(properties)
+        .build();
+  }
+
+  /**
+   * Test scenario 1: When resources are available, optimizer will be 
auto-allocated.
+   *
+   * <p>When min-parallelism > current optimizer cores and resources are 
available,
+   * OptimizerGroupKeeper should automatically create new optimizer instances.
+   */
+  @Test
+  public void testOptimizerAutoAllocatedWhenResourceAvailable() throws 
InterruptedException {
+    resourceAvailable.set(true);
+    scaleOutCallCount.set(0);
+    ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + 
"-1", 2);
+
+    optimizerManager().createResourceGroup(resourceGroup);
+    optimizingService().createResourceGroup(resourceGroup);
+
+    // Wait for OptimizerGroupKeeper to detect and create optimizer
+    // OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL is set to 10ms, call 
intervals are 10, 20, 30,
+    // 40, 10, 200ms can cover abnormal scenarios
+    Thread.sleep(200);
+
+    int totalCores =
+        optimizerManager().listOptimizers(resourceGroup.getName()).stream()
+            .mapToInt(OptimizerInstance::getThreadCount)
+            .sum();
+
+    Assertions.assertEquals(
+        1,
+        scaleOutCallCount.get(),
+        resourceGroup.getName()
+            + ":One scale-out should be triggered when min-parallelism is not 
satisfied");
+    Assertions.assertEquals(
+        2,
+        totalCores,
+        resourceGroup.getName()
+            + ":OptimizerGroupKeeper should attempt to create optimizer when 
resources are needed");
+  }
+
+  /**
+   * Test scenario 2: When min-parallelism is already satisfied, optimizer 
will not be allocated.
+   *
+   * <p>When current optimizer cores >= min-parallelism, OptimizerGroupKeeper 
should not trigger any
+   * scale-out operation.
+   */
+  @Test
+  public void testNoAllocationWhenMinParallelismSatisfied() throws 
InterruptedException {
+    resourceAvailable.set(true);
+    scaleOutCallCount.set(0);
+    ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + 
"-2", 2);
+
+    optimizerManager().createResourceGroup(resourceGroup);
+    optimizingService().createResourceGroup(resourceGroup);
+
+    // Register an optimizer with 3 threads (exceeds min-parallelism of 2)
+    OptimizerRegisterInfo registerInfo = 
buildRegisterInfo(resourceGroup.getName(), 3);
+    String testToken = optimizingService().authenticate(registerInfo);
+    Assertions.assertNotNull(testToken, "Optimizer should be registered 
successfully");
+
+    Thread.sleep(200);
+
+    // Verify no scale-out was triggered since min-parallelism is satisfied
+    Assertions.assertEquals(
+        0,
+        scaleOutCallCount.get(),
+        resourceGroup.getName()
+            + ":No scale-out should be triggered when min-parallelism is 
already satisfied");
+  }
+
+  /**
+   * Test scenario 3: When no resources available, min-parallelism will be 
reset to 0.
+   *
+   * <p>When OptimizerGroupKeeper fails to create optimizer multiple times 
(exceeds max attempts),
+   * and there are no existing optimizers, it will reset min-parallelism to 0.
+   */
+  @Test
+  public void testMinParallelismResetToZeroWhenNoResource() throws 
InterruptedException {
+    // Set resource not available - container will throw exception on scaleOut
+    resourceAvailable.set(false);
+    scaleOutCallCount.set(0);
+    ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + 
"-3", 2);
+
+    optimizerManager().createResourceGroup(resourceGroup);
+    optimizingService().createResourceGroup(resourceGroup);
+
+    Thread.sleep(200);
+    Assertions.assertEquals(
+        OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS.defaultValue(),
+        scaleOutCallCount.get(),
+        resourceGroup.getName()
+            + ":max scale-out attempts should be exhausted when no resources 
available");
+    ResourceGroup updatedGroup = 
optimizerManager().getResourceGroup(resourceGroup.getName());
+    String minParallelismStr =
+        
updatedGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM);
+    Assertions.assertEquals(
+        "0",
+        minParallelismStr,
+        resourceGroup.getName()
+            + ":min-parallelism should be reset to 0 when no resources 
available and no optimizer exists");
+  }
+
+  /**
+   * Test scenario 4: When no resources but has optimizer, min-parallelism 
will be reset to
+   * optimizer's executionParallel.
+   *
+   * <p>When OptimizerGroupKeeper fails to create optimizer multiple times and 
there are existing
+   * optimizers but not enough to meet min-parallelism, it will reset 
min-parallelism to the current
+   * total cores.
+   */
+  @Test
+  public void testMinParallelismResetToOptimizerParallelWhenNoMoreResource()
+      throws InterruptedException {
+    resourceAvailable.set(false);
+    scaleOutCallCount.set(0);
+    ResourceGroup resourceGroup = buildTestResourceGroup(TEST_GROUP_NAME + 
"-4", 2);
+
+    optimizerManager().createResourceGroup(resourceGroup);
+    optimizingService().createResourceGroup(resourceGroup);
+
+    OptimizerRegisterInfo registerInfo = 
buildRegisterInfo(resourceGroup.getName(), 1);
+    String testToken = optimizingService().authenticate(registerInfo);
+    Assertions.assertNotNull(testToken, "Optimizer should be registered 
successfully");
+
+    Thread.sleep(200);
+
+    ResourceGroup updatedGroup = 
optimizerManager().getResourceGroup(resourceGroup.getName());
+    String minParallelismStr =
+        
updatedGroup.getProperties().get(OptimizerProperties.OPTIMIZER_GROUP_MIN_PARALLELISM);
+    Assertions.assertEquals(
+        OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS.defaultValue(),
+        scaleOutCallCount.get(),
+        resourceGroup.getName()
+            + ":max scale-out attempts should be exhausted when no resources 
available");
+    Assertions.assertEquals(
+        "1",
+        minParallelismStr,
+        resourceGroup.getName()
+            + ":min-parallelism should be reset to optimizer's current total 
cores (1) when no more resources available");
+  }
+
+  private static OptimizerRegisterInfo buildRegisterInfo(String groupName, int 
threadCount) {
+    OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo();
+    Map<String, String> registerProperties = Maps.newHashMap();
+    registerProperties.put(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL, 
"100");
+    registerInfo.setProperties(registerProperties);
+    registerInfo.setThreadCount(threadCount);
+    registerInfo.setMemoryMb(1024);
+    registerInfo.setGroupName(groupName);
+    registerInfo.setResourceId("test-resource-" + System.currentTimeMillis() + 
"-" + threadCount);
+    registerInfo.setStartTime(System.currentTimeMillis());
+    return registerInfo;
+  }
+
+  /**
+   * Mock optimizer container for testing.
+   *
+   * <p>Simulates resource availability by controlling doScaleOut behavior:
+   *
+   * <ul>
+   *   <li>When resourceAvailable=true: calls authenticate to register 
optimizer
+   *   <li>When resourceAvailable=false: throw RuntimeException
+   * </ul>
+   */
+  public static class MockOptimizerContainer extends 
AbstractOptimizerContainer {
+
+    private final AtomicBoolean resourceAvailable;
+    private final AtomicInteger scaleOutCallCount;
+    private final Function<OptimizerRegisterInfo, String> optimizerRegistrar;
+
+    public MockOptimizerContainer(
+        AtomicBoolean resourceAvailable,
+        AtomicInteger scaleOutCallCount,
+        Function<OptimizerRegisterInfo, String> optimizerRegistrar) {
+      this.resourceAvailable = resourceAvailable;
+      this.scaleOutCallCount = scaleOutCallCount;
+      this.optimizerRegistrar = optimizerRegistrar;
+    }
+
+    @Override
+    public void init(String name, Map<String, String> containerProperties) {}
+
+    @Override
+    protected Map<String, String> doScaleOut(Resource resource) {
+      scaleOutCallCount.incrementAndGet();
+      if (!resourceAvailable.get()) {
+        throw new RuntimeException("No resources available");
+      }
+      // When resources are available, register optimizer by calling 
authenticate
+      // This simulates the real behavior where SparkOptimizerContainer starts 
SparkOptimizer,
+      // which uses OptimizerToucher to call authenticate
+      if (optimizerRegistrar != null) {
+        OptimizerRegisterInfo registerInfo =
+            buildRegisterInfo(resource.getGroupName(), 
resource.getThreadCount());
+        registerInfo.setMemoryMb(resource.getMemoryMb());
+        registerInfo.setResourceId(resource.getResourceId());
+        optimizerRegistrar.apply(registerInfo);
+      }
+      return Maps.newHashMap();
+    }
+
+    @Override
+    public void releaseResource(Resource resource) {}
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java 
b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
index bea9b90f9..a8d04a123 100644
--- a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
+++ b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
@@ -32,6 +32,7 @@ public class OptimizerProperties {
   public static final String OPTIMIZER_EXECUTION_PARALLEL = 
"execution-parallel";
   public static final String OPTIMIZER_MEMORY_SIZE = "memory-size";
   public static final String OPTIMIZER_GROUP_NAME = "group-name";
+  public static final String OPTIMIZER_GROUP_MIN_PARALLELISM = 
"min-parallelism";
   public static final String OPTIMIZER_HEART_BEAT_INTERVAL = 
"heart-beat-interval";
   public static final String OPTIMIZER_EXTEND_DISK_STORAGE = 
"extend-disk-storage";
   public static final boolean OPTIMIZER_EXTEND_DISK_STORAGE_DEFAULT = false;
diff --git a/docs/admin-guides/managing-optimizers.md 
b/docs/admin-guides/managing-optimizers.md
index aebea60bd..fd708e8c2 100644
--- a/docs/admin-guides/managing-optimizers.md
+++ b/docs/admin-guides/managing-optimizers.md
@@ -288,6 +288,7 @@ The optimizer group supports the following properties:
 | cache-max-total-size           | All            | No       | 128mb           
                                                                      | Max 
total size in optimier cache.                                                   
                                                                                
                                                                                
                                                                                
                  [...]
 | cache-max-entry-size           | All            | No       | 64mb            
                                                                      | Max 
entry size in optimizer cache.                                                  
                                                                                
                                                                                
                                                                                
                  [...]
 | cache-timeout                  | All            | No       | 10min           
                                                                      | Timeout 
in optimizer cache.                                                             
                                                                                
                                                                                
                                                                                
              [...]
+| min-parallelism                | All            | No       | 0               
                                                                      | The 
minimum total parallelism (CPU cores) that the optimizer group should maintain. 
When the total cores of running optimizers fall below this value, 
`OptimizerGroupKeeper` will automatically scale out new optimizers. Set to `0` 
to disable auto-scaling. Note: The behavior of the auto-scaling mechanism is 
controlled by the AMS-level configu [...]
 | memory                         | Local          | Yes      | N/A             
                                                                      | The max 
memory of JVM for local optimizer, in MBs.                                      
                                                                                
                                                                                
                                                                                
              [...]
 | flink-conf.\<key\>             | Flink          | No       | N/A             
                                                                      | Any 
flink config options could be overwritten, priority is optimizing-group > 
optimizing-container > flink-conf.yaml.                                         
                                                                                
                                                                                
                        [...]
 | spark-conf.\<key\>             | Spark          | No       | N/A             
                                                                      | Any 
spark config options could be overwritten, priority is optimizing-group > 
optimizing-container > spark-defaults.conf.                                     
                                                                                
                                                                                
                        [...]
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 24b518d0c..0f5f272da 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -90,6 +90,8 @@ table td:last-child, table th:last-child { width: 40%; 
word-break: break-all; }
 | http-server.proxy-client-ip-header | X-Real-IP | The HTTP header to record 
the real client IP address. If your server is behind a load balancer or other 
proxy, the server will see this load balancer or proxy IP address as the client 
IP address, to get around this common issue, most load balancers or proxies 
offer the ability to record the real remote IP address in an HTTP header that 
will be added to the request for other devices to use. |
 | http-server.rest-auth-type | token | The authentication used by REST APIs, 
token (default), basic or jwt. |
 | http-server.session-timeout | 7 d | Timeout for http session. |
+| optimizer-group.max-keeping-attempts | 3 | The maximum number of consecutive 
attempts to keep the optimizer group at its current parallelism. |
+| optimizer-group.min-parallelism-check-interval | 5 min | The interval for 
checking and ensuring the optimizer group meets its minimum parallelism 
requirement. When the current parallelism falls below the configured 
min-parallelism, the system will attempt to scale out optimizers at this 
interval. The actual scale-out timing is calculated as: consecutive keeping 
attempts * this interval. |
 | optimizer.heart-beat-timeout | 1 min | Timeout duration for Optimizer 
heartbeat. |
 | optimizer.max-planning-parallelism | 1 | Max planning parallelism in one 
optimizer group. |
 | optimizer.polling-timeout | 3 s | Optimizer polling task timeout. |


Reply via email to