This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 05c2f45042 Reduced warning logs under normal conditions in compaction 
coordinator (#4362)
05c2f45042 is described below

commit 05c2f45042ee91a5fe04702caa77ab19f78c0f9a
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Mar 13 11:52:47 2024 -0400

    Reduced warning logs under normal conditions in compaction coordinator 
(#4362)
    
    
    Fixes #4219
---
 .../coordinator/CompactionCoordinator.java         | 32 ++++++++++++++++++++--
 .../accumulo/coordinator/QueueSummaries.java       |  8 ++++++
 .../coordinator/CompactionCoordinatorTest.java     |  6 ++++
 3 files changed, 43 insertions(+), 3 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index f4819ebefb..b0ec498a9e 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -23,6 +23,7 @@ import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.lang.reflect.InvocationTargetException;
 import java.net.UnknownHostException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -303,9 +304,12 @@ public class CompactionCoordinator extends AbstractServer
       updateSummaries();
 
       long now = System.currentTimeMillis();
-      TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> {
-        if ((now - v) > getMissingCompactorWarningTime()) {
-          LOG.warn("No compactors have checked in with coordinator for queue 
{} in {}ms", k,
+
+      Map<String,List<HostAndPort>> idleCompactors = getIdleCompactors();
+      TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> {
+        if ((now - lastCheckTime) > getMissingCompactorWarningTime()
+            && QUEUE_SUMMARIES.isCompactionsQueued(queue) && 
idleCompactors.containsKey(queue)) {
+          LOG.warn("No compactors have checked in with coordinator for queue 
{} in {}ms", queue,
               getMissingCompactorWarningTime());
         }
       });
@@ -321,6 +325,28 @@ public class CompactionCoordinator extends AbstractServer
     LOG.info("Shutting down");
   }
 
+  private Map<String,List<HostAndPort>> getIdleCompactors() {
+
+    Map<String,List<HostAndPort>> allCompactors =
+        ExternalCompactionUtil.getCompactorAddrs(getContext());
+
+    Set<String> emptyQueues = new HashSet<>();
+
+    // Remove all of the compactors that are running a compaction
+    RUNNING_CACHE.values().forEach(rc -> {
+      List<HostAndPort> busyCompactors = allCompactors.get(rc.getQueueName());
+      if (busyCompactors != null
+          && 
busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) {
+        if (busyCompactors.isEmpty()) {
+          emptyQueues.add(rc.getQueueName());
+        }
+      }
+    });
+    // Remove entries with empty queues
+    emptyQueues.forEach(e -> allCompactors.remove(e));
+    return allCompactors;
+  }
+
   private void updateSummaries() {
     ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
         "Compaction Summary Gatherer", false);
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
index 6edb2c0f36..1d89cd0321 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
@@ -100,6 +100,14 @@ public class QueueSummaries {
     }
   }
 
+  synchronized boolean isCompactionsQueued(String queue) {
+    var q = QUEUES.get(queue);
+    if (q == null) {
+      return false;
+    }
+    return !q.isEmpty();
+  }
+
   synchronized PrioTserver getNextTserver(String queue) {
 
     Entry<Short,TreeSet<TServerInstance>> entry = getNextTserverEntry(queue);
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 117d50108a..87e7471bef 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -214,6 +214,7 @@ public class CompactionCoordinatorTest {
     var coordinator = new TestCoordinator(null, null, null, null, context, 
null);
     // Should be equal to 3 * 15_000 milliseconds
     assertEquals(45_000, coordinator.getMissingCompactorWarningTime());
+    coordinator.close();
   }
 
   @Test
@@ -231,6 +232,7 @@ public class CompactionCoordinatorTest {
     List<RunningCompaction> runningCompactions = new ArrayList<>();
     expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
         .andReturn(runningCompactions);
+    
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
 
     CompactionFinalizer finalizer = 
PowerMock.createNiceMock(CompactionFinalizer.class);
     LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
@@ -284,6 +286,7 @@ public class CompactionCoordinatorTest {
     List<RunningCompaction> runningCompactions = new ArrayList<>();
     expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
         .andReturn(runningCompactions);
+    
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
 
     CompactionFinalizer finalizer = 
PowerMock.createNiceMock(CompactionFinalizer.class);
     LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);
@@ -363,6 +366,7 @@ public class CompactionCoordinatorTest {
     List<RunningCompaction> runningCompactions = new ArrayList<>();
     expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
         .andReturn(runningCompactions);
+    
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
 
     ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
     HostAndPort address = HostAndPort.fromString("localhost:10240");
@@ -443,6 +447,7 @@ public class CompactionCoordinatorTest {
     runningCompactions.add(new RunningCompaction(job, 
tserverAddress.toString(), "queue"));
     expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
         .andReturn(runningCompactions);
+    
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
 
     ServerAddress client = PowerMock.createNiceMock(ServerAddress.class);
     HostAndPort address = HostAndPort.fromString("localhost:10240");
@@ -508,6 +513,7 @@ public class CompactionCoordinatorTest {
     List<RunningCompaction> runningCompactions = new ArrayList<>();
     expect(ExternalCompactionUtil.getCompactionsRunningOnCompactors(context))
         .andReturn(runningCompactions);
+    
expect(ExternalCompactionUtil.getCompactorAddrs(context)).andReturn(Map.of()).anyTimes();
 
     CompactionFinalizer finalizer = 
PowerMock.createNiceMock(CompactionFinalizer.class);
     LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class);

Reply via email to