keith-turner commented on code in PR #5726:
URL: https://github.com/apache/accumulo/pull/5726#discussion_r2210985946
##########
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java:
##########
@@ -611,18 +620,89 @@ public void compactionCompleted(TInfo tinfo, TCredentials
credentials,
@Override
public void compactionFailed(TInfo tinfo, TCredentials credentials, String
externalCompactionId,
- TKeyExtent extent) throws ThriftSecurityException {
+ TKeyExtent extent, String exceptionClassName) throws
ThriftSecurityException {
// do not expect users to call this directly, expect other tservers to
call this method
if (!security.canPerformSystemActions(credentials)) {
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
- LOG.info("Compaction failed: id: {}, extent: {}", externalCompactionId,
fromThriftExtent);
+ LOG.info("Compaction failed: id: {}, extent: {}, compactor exception:{}",
externalCompactionId,
+ fromThriftExtent, exceptionClassName);
final var ecid = ExternalCompactionId.of(externalCompactionId);
+ if (exceptionClassName != null) {
+ captureFailure(ecid, fromThriftExtent);
+ }
compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
}
+ private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) {
+ var rc = RUNNING_CACHE.get(ecid);
+ if (rc != null) {
+ final String queue = rc.getQueueName();
+ failingQueues.computeIfAbsent(queue, q -> new
AtomicLong(0)).incrementAndGet();
+ final String compactor = rc.getCompactorAddress();
+ failingCompactors.computeIfAbsent(compactor, c -> new
AtomicLong(0)).incrementAndGet();
+ }
+ failingTables.computeIfAbsent(extent.tableId(), t -> new
AtomicLong(0)).incrementAndGet();
+ }
+
+ protected void startFailureSummaryLogging() {
+ ScheduledFuture<?> future = getContext().getScheduledExecutor()
+ .scheduleWithFixedDelay(this::printFailures, 0, 5, TimeUnit.MINUTES);
+ ThreadPools.watchNonCriticalScheduledTask(future);
+ }
+
+ private void printFailures() {
+
+ // Remove down compactors from failing list
+ Map<String,List<HostAndPort>> allCompactors =
+ ExternalCompactionUtil.getCompactorAddrs(getContext());
+ Set<String> allCompactorAddrs = new HashSet<>();
+ allCompactors.values().forEach(l -> l.forEach(c ->
allCompactorAddrs.add(c.toString())));
+ failingCompactors.keySet().retainAll(allCompactorAddrs);
+
Review Comment:
Created this PR https://github.com/dlmarion/accumulo/pull/56 .. but I have
not tested the changes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]