This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 05c2f45042 Reduced warning logs under normal conditions in compaction coordinator (#4362) 05c2f45042 is described below commit 05c2f45042ee91a5fe04702caa77ab19f78c0f9a Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Mar 13 11:52:47 2024 -0400 Reduced warning logs under normal conditions in compaction coordinator (#4362) Fixes #4219 --- .../coordinator/CompactionCoordinator.java | 32 ++++++++++++++++++++-- .../accumulo/coordinator/QueueSummaries.java | 8 ++++++ .../coordinator/CompactionCoordinatorTest.java | 6 ++++ 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index f4819ebefb..b0ec498a9e 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -23,6 +23,7 @@ import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -303,9 +304,12 @@ public class CompactionCoordinator extends AbstractServer updateSummaries(); long now = System.currentTimeMillis(); - TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> { - if ((now - v) > getMissingCompactorWarningTime()) { - LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", k, + + Map<String,List<HostAndPort>> idleCompactors = getIdleCompactors(); + TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> { + if ((now - lastCheckTime) > getMissingCompactorWarningTime() + && QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) { + LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue, getMissingCompactorWarningTime()); } }); @@ -321,6 +325,28 @@ public class CompactionCoordinator extends AbstractServer LOG.info("Shutting down"); } + private Map<String,List<HostAndPort>> getIdleCompactors() { + + Map<String,List<HostAndPort>> allCompactors = + ExternalCompactionUtil.getCompactorAddrs(getContext()); + + Set<String> emptyQueues = new HashSet<>(); + + // Remove all of the compactors that are running a compaction + RUNNING_CACHE.values().forEach(rc -> { + List<HostAndPort> busyCompactors = allCompactors.get(rc.getQueueName()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) { + if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getQueueName()); + } + } + }); + // Remove entries with empty queues + emptyQueues.forEach(e -> allCompactors.remove(e)); + return allCompactors; + } + private void updateSummaries() { ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, "Compaction Summary Gatherer", false); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java index 6edb2c0f36..1d89cd0321 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java @@ -100,6 +100,14 @@ public class QueueSummaries { } } + synchronized boolean isCompactionsQueued(String queue) { + var q = QUEUES.get(queue); + if (q == null) { + return false; + } + return !q.isEmpty(); + } + synchronized PrioTserver getNextTserver(String queue) { Entry<Short,TreeSet<TServerInstance>> entry = getNextTserverEntry(queue); diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 117d50108a..87e7471bef 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -214,6 +214,7 @@ public class CompactionCoordinatorTest { var coordinator = new TestCoordinator(null, null, null, null, context, null); // Should be equal to 3 * 15_000 milliseconds assertEquals(45_000, coordinator.getMissingCompactorWarningTime()); + coordinator.close(); } @Test @@ -231,6 +232,7 @@ public class CompactionCoordinatorTest { List<RunningCompaction> runningCompactions = new ArrayList<>(); expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context)) .andReturn(runningCompactions); + expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes(); CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class); LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class); @@ -284,6 +286,7 @@ public class CompactionCoordinatorTest { List<RunningCompaction> runningCompactions = new ArrayList<>(); expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context)) .andReturn(runningCompactions); + expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes(); CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class); LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class); @@ -363,6 +366,7 @@ public class CompactionCoordinatorTest { List<RunningCompaction> runningCompactions = new ArrayList<>(); expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context)) .andReturn(runningCompactions); + expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes(); ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); HostAndPort address = HostAndPort.fromString("localhost:10240"); @@ -443,6 +447,7 @@ public class CompactionCoordinatorTest { runningCompactions.add(new RunningCompaction(job, tserverAddress.toString(), "queue")); expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context)) .andReturn(runningCompactions); + expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes(); ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); HostAndPort address = HostAndPort.fromString("localhost:10240"); @@ -508,6 +513,7 @@ public class CompactionCoordinatorTest { List<RunningCompaction> runningCompactions = new ArrayList<>(); expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context)) .andReturn(runningCompactions); + expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes(); CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class); LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);