This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new cbcf0a0568 Modifications to make compaction queue size dynamic (#4767) cbcf0a0568 is described below commit cbcf0a0568194e39ba0e798b6317ddd916af0f24 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Jul 29 17:11:38 2024 -0400 Modifications to make compaction queue size dynamic (#4767) Modified the logic in CompactionCoordinator to clear the priority queue for a compactor group when there are no compactors remaining or to set the max size based on the number of remaining compactors multiplied by some factor set by the user. Closes #3635 --- .../org/apache/accumulo/core/conf/Property.java | 14 ++++-- .../miniclusterImpl/MiniAccumuloConfigImpl.java | 4 +- .../java/org/apache/accumulo/manager/Manager.java | 3 +- .../coordinator/CompactionCoordinator.java | 54 +++++++++++++++------- .../queue/CompactionJobPriorityQueue.java | 20 ++++++-- .../compaction/CompactionCoordinatorTest.java | 2 +- .../queue/CompactionJobPriorityQueueTest.java | 9 ++++ .../CompactionPriorityQueueMetricsIT.java | 2 +- 8 files changed, 77 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index ce425a8940..cad729af32 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -445,9 +445,14 @@ public enum Property { MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", "8", PropertyType.COUNT, "The number of threads used to inspect tablets files to find split points.", "4.0.0"), - MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", - "10000", PropertyType.COUNT, - "The max size of each resource groups compaction job priority queue.", "4.0"), + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE( + "manager.compaction.major.service.queue.initial.size", "10000", PropertyType.COUNT, + "The initial size of each resource groups compaction job priority queue.", "4.0.0"), + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR( + "manager.compaction.major.service.queue.size.factor", "3.0", PropertyType.FRACTION, + "The dynamic resizing of the compaction job priority queue is based on" + + " the number of compactors for the group multiplied by this factor.", + "4.0.0"), SPLIT_PREFIX("split.", null, PropertyType.PREFIX, "System wide properties related to splitting tablets.", "3.1.0"), SPLIT_MAXOPEN("split.files.max", "300", PropertyType.COUNT, @@ -1433,6 +1438,9 @@ public enum Property { // max message options RPC_MAX_MESSAGE_SIZE, + // compaction coordiantor properties + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE, + // block cache options TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java index b5c6667519..5b81fac468 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java @@ -188,8 +188,8 @@ public class MiniAccumuloConfigImpl { mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true"); - mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(), - Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue()); + mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getKey(), + Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getDefaultValue()); mergeProp(Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(), Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 0690e93afa..72c79121e5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1108,8 +1108,7 @@ public class Manager extends AbstractServer // Start the Manager's Fate Service fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); - compactionCoordinator = - new CompactionCoordinator(context, security, fateRefs, getResourceGroup(), this); + compactionCoordinator = new CompactionCoordinator(context, security, fateRefs, this); // Start the Manager's Client service // Ensure that calls before the manager gets the lock fail diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 34e9d9191b..92bc94f4ab 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -117,6 +117,7 @@ import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; import org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile; +import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionConfigStorage; @@ -167,7 +168,6 @@ public class CompactionCoordinator private final ServerContext ctx; private final SecurityOperation security; - private final String resourceGroupName; private final CompactionJobQueues jobQueues; private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances; // Exposed for tests @@ -184,18 +184,19 @@ public class CompactionCoordinator private final Manager manager; private final LoadingCache<String,Integer> compactorCounts; + private final int jobQueueInitialSize; public CompactionCoordinator(ServerContext ctx, SecurityOperation security, - AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, - final String resourceGroupName, Manager manager) { + AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, Manager manager) { this.ctx = ctx; this.schedExecutor = this.ctx.getScheduledExecutor(); this.security = security; - this.resourceGroupName = resourceGroupName; this.manager = Objects.requireNonNull(manager); - this.jobQueues = new CompactionJobQueues( - ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); + this.jobQueueInitialSize = ctx.getConfiguration() + .getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE); + + this.jobQueues = new CompactionJobQueues(jobQueueInitialSize); this.queueMetrics = new QueueMetrics(jobQueues); @@ -1067,27 +1068,46 @@ public class CompactionCoordinator private void cleanUpCompactors() { final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS; - var zoorw = this.ctx.getZooReaderWriter(); + final var zoorw = this.ctx.getZooReaderWriter(); + final double queueSizeFactor = ctx.getConfiguration() + .getFraction(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR); try { var groups = zoorw.getChildren(compactorQueuesPath); for (String group : groups) { - String qpath = compactorQueuesPath + "/" + group; - - var compactors = zoorw.getChildren(qpath); + final String qpath = compactorQueuesPath + "/" + group; + final CompactorGroupId cgid = CompactorGroupId.of(group); + final var compactors = zoorw.getChildren(qpath); if (compactors.isEmpty()) { deleteEmpty(zoorw, qpath); - } - - for (String compactor : compactors) { - String cpath = compactorQueuesPath + "/" + group + "/" + compactor; - var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group + "/" + compactor); - if (lockNodes.isEmpty()) { - deleteEmpty(zoorw, cpath); + // Group has no compactors, we can clear its + // associated priority queue of jobs + CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid); + if (queue != null) { + queue.clear(); + queue.setMaxSize(this.jobQueueInitialSize); } + } else { + int aliveCompactorsForGroup = 0; + for (String compactor : compactors) { + String cpath = compactorQueuesPath + "/" + group + "/" + compactor; + var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group + "/" + compactor); + if (lockNodes.isEmpty()) { + deleteEmpty(zoorw, cpath); + } else { + aliveCompactorsForGroup++; + } + } + CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid); + if (queue != null) { + queue.setMaxSize( + Math.min((int) (aliveCompactorsForGroup * queueSizeFactor), Integer.MAX_VALUE)); + } + } + } } catch (KeeperException | RuntimeException e) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index c91b8becf1..9a4c726463 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -31,6 +31,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -107,7 +108,7 @@ public class CompactionJobPriorityQueue { // efficiently removing entries from anywhere in the queue. Efficient removal is needed for the // case where tablets decided to issues different compaction jobs than what is currently queued. private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue; - private final int maxSize; + private AtomicInteger maxSize; private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> futures; @@ -131,7 +132,7 @@ public class CompactionJobPriorityQueue { public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { this.jobQueue = new TreeMap<>(); - this.maxSize = maxSize; + this.maxSize = new AtomicInteger(maxSize); this.tabletJobs = new HashMap<>(); this.groupId = groupId; this.rejectedJobs = new AtomicLong(0); @@ -201,8 +202,12 @@ public class CompactionJobPriorityQueue { return jobsAdded; } - public long getMaxSize() { - return maxSize; + public synchronized int getMaxSize() { + return maxSize.get(); + } + + public synchronized void setMaxSize(int maxSize) { + this.maxSize.set(maxSize); } public long getRejectedJobs() { @@ -284,7 +289,7 @@ public class CompactionJobPriorityQueue { } private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) { - if (jobQueue.size() >= maxSize) { + if (jobQueue.size() >= maxSize.get()) { var lastEntry = jobQueue.lastKey(); if (job.getPriority() <= lastEntry.job.getPriority()) { // the queue is full and this job has a lower or same priority than the lowest job in the @@ -304,4 +309,9 @@ public class CompactionJobPriorityQueue { jobQueue.put(key, new CompactionJobQueues.MetaJob(job, tabletMetadata)); return key; } + + public synchronized void clear() { + jobQueue.clear(); + tabletJobs.clear(); + } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 5c7a913d91..58a592f036 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -130,7 +130,7 @@ public class CompactionCoordinatorTest { public TestCoordinator(ServerContext ctx, SecurityOperation security, List<RunningCompaction> runningCompactions, Manager manager) { - super(ctx, security, fateInstances, "TEST_GROUP", manager); + super(ctx, security, fateInstances, manager); this.runningCompactions = runningCompactions; } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 5464b90a33..05e0b35c55 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -300,4 +300,13 @@ public class CompactionJobPriorityQueueTest { < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + CANCEL_THRESHOLD)); assertTrue(maxFuturesSize > 2 * CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD); } + + @Test + public void testChangeMaxSize() { + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + assertEquals(100, queue.getMaxSize()); + queue.setMaxSize(50); + assertEquals(50, queue.getMaxSize()); + } + } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index 40f6ddc107..c2a86fa6bc 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -196,7 +196,7 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE1_SERVICE + ".planner.opts.groups", "[{'group':'" + QUEUE1 + "'}]"); - cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, "6"); + cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE, "6"); cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0); // This test waits for dead compactors to be absent in zookeeper. The following setting will