keith-turner commented on a change in pull request #2132:
URL: https://github.com/apache/accumulo/pull/2132#discussion_r644903355
##########
File path:
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
running.forEach((ecid) -> {
if (tabletCompactions.remove(ecid) != null) {
- log.trace("Removed {} running on a compactor", ecid);
+ log.trace("Removed compaction {} running on a compactor", ecid);
+ }
+ if (this.deadCompactions.remove(ecid) != null) {
+ log.trace("Removed {} from the dead compaction map, it's running on a
compactor", ecid);
}
});
// Determine which compactions are currently committing and remove those
context.getAmple().getExternalCompactionFinalStates()
- .map(ecfs ->
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+ .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+ if (tabletCompactions.remove(ecid) != null) {
+ log.trace("Removed compaction {} that is committing", ecid);
+ }
+ if (this.deadCompactions.remove(ecid) != null) {
+ log.trace("Removed {} from the dead compaction map, it's
committing", ecid);
+ }
+ });
- tabletCompactions
- .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}",
ecid, extent));
+ tabletCompactions.forEach((ecid, extent) -> {
+ log.debug("Possible dead compaction detected {} {}", ecid, extent);
+ this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
+ this.deadCompactions.merge(ecid, 1L, Long::sum);
+ });
// Everything left in tabletCompactions is no longer running anywhere and
should be failed.
// Its possible that a compaction committed while going through the steps
above, if so then
// that is ok and marking it failed will end up being a no-op.
+ Set<ExternalCompactionId> toFail =
+ this.deadCompactions.entrySet().stream().filter(e -> e.getValue() >
2).map(e -> e.getKey())
+ .collect(Collectors.toCollection(TreeSet::new));
+ tabletCompactions.keySet().retainAll(toFail);
+ tabletCompactions.forEach((eci, v) -> {
+ log.warn("Compaction {} believed to be dead, failing it.", eci);
Review comment:
Maybe make this info. Thinking that compactor processes dying will be
somewhat normal/expected so may not want to warn.
```suggestion
log.info("Compaction {} believed to be dead, failing it.", eci);
```
##########
File path:
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
running.forEach((ecid) -> {
if (tabletCompactions.remove(ecid) != null) {
- log.trace("Removed {} running on a compactor", ecid);
+ log.trace("Removed compaction {} running on a compactor", ecid);
+ }
+ if (this.deadCompactions.remove(ecid) != null) {
+ log.trace("Removed {} from the dead compaction map, it's running on a
compactor", ecid);
}
});
// Determine which compactions are currently committing and remove those
context.getAmple().getExternalCompactionFinalStates()
- .map(ecfs ->
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+ .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+ if (tabletCompactions.remove(ecid) != null) {
+ log.trace("Removed compaction {} that is committing", ecid);
+ }
+ if (this.deadCompactions.remove(ecid) != null) {
+ log.trace("Removed {} from the dead compaction map, it's
committing", ecid);
+ }
+ });
- tabletCompactions
- .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}",
ecid, extent));
+ tabletCompactions.forEach((ecid, extent) -> {
+ log.debug("Possible dead compaction detected {} {}", ecid, extent);
+ this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
Review comment:
Think this line can be removed
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]