keith-turner commented on code in PR #5726:
URL: https://github.com/apache/accumulo/pull/5726#discussion_r2210985946


##########
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java:
##########
@@ -611,18 +620,89 @@ public void compactionCompleted(TInfo tinfo, TCredentials 
credentials,
 
   @Override
   public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
-      TKeyExtent extent) throws ThriftSecurityException {
+      TKeyExtent extent, String exceptionClassName) throws 
ThriftSecurityException {
     // do not expect users to call this directly, expect other tservers to 
call this method
     if (!security.canPerformSystemActions(credentials)) {
       throw new AccumuloSecurityException(credentials.getPrincipal(),
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
     KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
-    LOG.info("Compaction failed: id: {}, extent: {}", externalCompactionId, 
fromThriftExtent);
+    LOG.info("Compaction failed: id: {}, extent: {}, compactor exception:{}", 
externalCompactionId,
+        fromThriftExtent, exceptionClassName);
     final var ecid = ExternalCompactionId.of(externalCompactionId);
+    if (exceptionClassName != null) {
+      captureFailure(ecid, fromThriftExtent);
+    }
     compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
   }
 
+  private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) {
+    var rc = RUNNING_CACHE.get(ecid);
+    if (rc != null) {
+      final String queue = rc.getQueueName();
+      failingQueues.computeIfAbsent(queue, q -> new 
AtomicLong(0)).incrementAndGet();
+      final String compactor = rc.getCompactorAddress();
+      failingCompactors.computeIfAbsent(compactor, c -> new 
AtomicLong(0)).incrementAndGet();
+    }
+    failingTables.computeIfAbsent(extent.tableId(), t -> new 
AtomicLong(0)).incrementAndGet();
+  }
+
+  protected void startFailureSummaryLogging() {
+    ScheduledFuture<?> future = getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(this::printFailures, 0, 5, TimeUnit.MINUTES);
+    ThreadPools.watchNonCriticalScheduledTask(future);
+  }
+
+  private void printFailures() {
+
+    // Remove down compactors from failing list
+    Map<String,List<HostAndPort>> allCompactors =
+        ExternalCompactionUtil.getCompactorAddrs(getContext());
+    Set<String> allCompactorAddrs = new HashSet<>();
+    allCompactors.values().forEach(l -> l.forEach(c -> 
allCompactorAddrs.add(c.toString())));
+    failingCompactors.keySet().retainAll(allCompactorAddrs);
+

Review Comment:
   Created this PR https://github.com/dlmarion/accumulo/pull/56 .. but I have 
not tested the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to