This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new cbcf0a0568 Modifications to make compaction queue size dynamic (#4767)
cbcf0a0568 is described below

commit cbcf0a0568194e39ba0e798b6317ddd916af0f24
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Jul 29 17:11:38 2024 -0400

    Modifications to make compaction queue size dynamic (#4767)
    
    Modified the logic in CompactionCoordinator to clear the
    priority queue for a compactor group when there are no
    compactors remaining or to set the max size based on
    the number of remaining compactors multiplied by some
    factor set by the user.
    
    Closes #3635
---
 .../org/apache/accumulo/core/conf/Property.java    | 14 ++++--
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    |  4 +-
 .../java/org/apache/accumulo/manager/Manager.java  |  3 +-
 .../coordinator/CompactionCoordinator.java         | 54 +++++++++++++++-------
 .../queue/CompactionJobPriorityQueue.java          | 20 ++++++--
 .../compaction/CompactionCoordinatorTest.java      |  2 +-
 .../queue/CompactionJobPriorityQueueTest.java      |  9 ++++
 .../CompactionPriorityQueueMetricsIT.java          |  2 +-
 8 files changed, 77 insertions(+), 31 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ce425a8940..cad729af32 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -445,9 +445,14 @@ public enum Property {
   MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", 
"8", PropertyType.COUNT,
       "The number of threads used to inspect tablets files to find split 
points.", "4.0.0"),
 
-  
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
-      "10000", PropertyType.COUNT,
-      "The max size of each resource groups compaction job priority queue.", 
"4.0"),
+  MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
+      "manager.compaction.major.service.queue.initial.size", "10000", 
PropertyType.COUNT,
+      "The initial size of each resource groups compaction job priority 
queue.", "4.0.0"),
+  MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR(
+      "manager.compaction.major.service.queue.size.factor", "3.0", 
PropertyType.FRACTION,
+      "The dynamic resizing of the compaction job priority queue is based on"
+          + " the number of compactors for the group multiplied by this 
factor.",
+      "4.0.0"),
   SPLIT_PREFIX("split.", null, PropertyType.PREFIX,
       "System wide properties related to splitting tablets.", "3.1.0"),
   SPLIT_MAXOPEN("split.files.max", "300", PropertyType.COUNT,
@@ -1433,6 +1438,9 @@ public enum Property {
       // max message options
       RPC_MAX_MESSAGE_SIZE,
 
+      // compaction coordiantor properties
+      MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
+
       // block cache options
       TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
       TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, 
SSERV_SUMMARYCACHE_SIZE,
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index b5c6667519..5b81fac468 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -188,8 +188,8 @@ public class MiniAccumuloConfigImpl {
 
       mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true");
 
-      
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(),
-          
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue());
+      
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getKey(),
+          
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getDefaultValue());
       mergeProp(Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(),
           Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue());
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0690e93afa..72c79121e5 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1108,8 +1108,7 @@ public class Manager extends AbstractServer
     // Start the Manager's Fate Service
     fateServiceHandler = new FateServiceHandler(this);
     managerClientHandler = new ManagerClientServiceHandler(this);
-    compactionCoordinator =
-        new CompactionCoordinator(context, security, fateRefs, 
getResourceGroup(), this);
+    compactionCoordinator = new CompactionCoordinator(context, security, 
fateRefs, this);
 
     // Start the Manager's Client service
     // Ensure that calls before the manager gets the lock fail
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 34e9d9191b..92bc94f4ab 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -117,6 +117,7 @@ import org.apache.accumulo.manager.Manager;
 import 
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
 import 
org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
 import 
org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionConfigStorage;
@@ -167,7 +168,6 @@ public class CompactionCoordinator
 
   private final ServerContext ctx;
   private final SecurityOperation security;
-  private final String resourceGroupName;
   private final CompactionJobQueues jobQueues;
   private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> 
fateInstances;
   // Exposed for tests
@@ -184,18 +184,19 @@ public class CompactionCoordinator
   private final Manager manager;
 
   private final LoadingCache<String,Integer> compactorCounts;
+  private final int jobQueueInitialSize;
 
   public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
-      AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances,
-      final String resourceGroupName, Manager manager) {
+      AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, 
Manager manager) {
     this.ctx = ctx;
     this.schedExecutor = this.ctx.getScheduledExecutor();
     this.security = security;
-    this.resourceGroupName = resourceGroupName;
     this.manager = Objects.requireNonNull(manager);
 
-    this.jobQueues = new CompactionJobQueues(
-        
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
+    this.jobQueueInitialSize = ctx.getConfiguration()
+        
.getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE);
+
+    this.jobQueues = new CompactionJobQueues(jobQueueInitialSize);
 
     this.queueMetrics = new QueueMetrics(jobQueues);
 
@@ -1067,27 +1068,46 @@ public class CompactionCoordinator
   private void cleanUpCompactors() {
     final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
 
-    var zoorw = this.ctx.getZooReaderWriter();
+    final var zoorw = this.ctx.getZooReaderWriter();
+    final double queueSizeFactor = ctx.getConfiguration()
+        
.getFraction(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR);
 
     try {
       var groups = zoorw.getChildren(compactorQueuesPath);
 
       for (String group : groups) {
-        String qpath = compactorQueuesPath + "/" + group;
-
-        var compactors = zoorw.getChildren(qpath);
+        final String qpath = compactorQueuesPath + "/" + group;
+        final CompactorGroupId cgid = CompactorGroupId.of(group);
+        final var compactors = zoorw.getChildren(qpath);
 
         if (compactors.isEmpty()) {
           deleteEmpty(zoorw, qpath);
-        }
-
-        for (String compactor : compactors) {
-          String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
-          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group 
+ "/" + compactor);
-          if (lockNodes.isEmpty()) {
-            deleteEmpty(zoorw, cpath);
+          // Group has no compactors, we can clear its
+          // associated priority queue of jobs
+          CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
+          if (queue != null) {
+            queue.clear();
+            queue.setMaxSize(this.jobQueueInitialSize);
           }
+        } else {
+          int aliveCompactorsForGroup = 0;
+          for (String compactor : compactors) {
+            String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
+            var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + 
group + "/" + compactor);
+            if (lockNodes.isEmpty()) {
+              deleteEmpty(zoorw, cpath);
+            } else {
+              aliveCompactorsForGroup++;
+            }
+          }
+          CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
+          if (queue != null) {
+            queue.setMaxSize(
+                Math.min((int) (aliveCompactorsForGroup * queueSizeFactor), 
Integer.MAX_VALUE));
+          }
+
         }
+
       }
 
     } catch (KeeperException | RuntimeException e) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index c91b8becf1..9a4c726463 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -31,6 +31,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -107,7 +108,7 @@ public class CompactionJobPriorityQueue {
   // efficiently removing entries from anywhere in the queue. Efficient 
removal is needed for the
   // case where tablets decided to issues different compaction jobs than what 
is currently queued.
   private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue;
-  private final int maxSize;
+  private AtomicInteger maxSize;
   private final AtomicLong rejectedJobs;
   private final AtomicLong dequeuedJobs;
   private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> 
futures;
@@ -131,7 +132,7 @@ public class CompactionJobPriorityQueue {
 
   public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
     this.jobQueue = new TreeMap<>();
-    this.maxSize = maxSize;
+    this.maxSize = new AtomicInteger(maxSize);
     this.tabletJobs = new HashMap<>();
     this.groupId = groupId;
     this.rejectedJobs = new AtomicLong(0);
@@ -201,8 +202,12 @@ public class CompactionJobPriorityQueue {
     return jobsAdded;
   }
 
-  public long getMaxSize() {
-    return maxSize;
+  public synchronized int getMaxSize() {
+    return maxSize.get();
+  }
+
+  public synchronized void setMaxSize(int maxSize) {
+    this.maxSize.set(maxSize);
   }
 
   public long getRejectedJobs() {
@@ -284,7 +289,7 @@ public class CompactionJobPriorityQueue {
   }
 
   private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob 
job) {
-    if (jobQueue.size() >= maxSize) {
+    if (jobQueue.size() >= maxSize.get()) {
       var lastEntry = jobQueue.lastKey();
       if (job.getPriority() <= lastEntry.job.getPriority()) {
         // the queue is full and this job has a lower or same priority than 
the lowest job in the
@@ -304,4 +309,9 @@ public class CompactionJobPriorityQueue {
     jobQueue.put(key, new CompactionJobQueues.MetaJob(job, tabletMetadata));
     return key;
   }
+
+  public synchronized void clear() {
+    jobQueue.clear();
+    tabletJobs.clear();
+  }
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 5c7a913d91..58a592f036 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -130,7 +130,7 @@ public class CompactionCoordinatorTest {
 
     public TestCoordinator(ServerContext ctx, SecurityOperation security,
         List<RunningCompaction> runningCompactions, Manager manager) {
-      super(ctx, security, fateInstances, "TEST_GROUP", manager);
+      super(ctx, security, fateInstances, manager);
       this.runningCompactions = runningCompactions;
     }
 
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 5464b90a33..05e0b35c55 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -300,4 +300,13 @@ public class CompactionJobPriorityQueueTest {
         < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + 
CANCEL_THRESHOLD));
     assertTrue(maxFuturesSize > 2 * 
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
   }
+
+  @Test
+  public void testChangeMaxSize() {
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100);
+    assertEquals(100, queue.getMaxSize());
+    queue.setMaxSize(50);
+    assertEquals(50, queue.getMaxSize());
+  }
+
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index 40f6ddc107..c2a86fa6bc 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -196,7 +196,7 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
           Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE1_SERVICE + 
".planner.opts.groups",
           "[{'group':'" + QUEUE1 + "'}]");
 
-      cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, 
"6");
+      
cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
 "6");
       cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0);
 
       // This test waits for dead compactors to be absent in zookeeper. The 
following setting will

Reply via email to