dlmarion commented on a change in pull request #2132:
URL: https://github.com/apache/accumulo/pull/2132#discussion_r644867159



##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -75,6 +82,17 @@ private void detectDeadCompactions() {
       tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", 
ecid, extent));
     }
 
+    // Remove from the dead map any compactions that the Tablet's
+    // do not think are running any more.
+    this.deadCompactions.keySet().forEach(eci -> {
+      if (!tabletCompactions.containsKey(eci)) {
+        if (this.deadCompactions.remove(eci) != null)
+          log.trace(
+              "Removed {} from the dead compaction map, no tablet thinks this 
compaction is running",
+              eci);
+      }
+    });

Review comment:
       thanks. That's much simpler.

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -85,31 +103,55 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());

Review comment:
       It looks like it will set it to 1 if  it doesn't exist, I'm not sure if 
it will increment or not based on the javadoc.

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -85,31 +103,55 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());

Review comment:
       It works.

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -75,6 +82,17 @@ private void detectDeadCompactions() {
       tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", 
ecid, extent));
     }
 
+    // Remove from the dead map any compactions that the Tablet's
+    // do not think are running any more.
+    this.deadCompactions.keySet().forEach(eci -> {
+      if (!tabletCompactions.containsKey(eci)) {
+        if (this.deadCompactions.remove(eci) != null)
+          log.trace(
+              "Removed {} from the dead compaction map, no tablet thinks this 
compaction is running",
+              eci);
+      }
+    });

Review comment:
       I applied this change in 95e8fec149722ac3f205734103223e3f42b33438

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -85,31 +103,55 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
+    });
 
     // Everything left in tabletCompactions is no longer running anywhere and 
should be failed.
     // Its possible that a compaction committed while going through the steps 
above, if so then
     // that is ok and marking it failed will end up being a no-op.
-    try {
-      coordinator.compactionFailed(tabletCompactions);
-    } catch (UnknownCompactionIdException e) {
-      // One or more Ids was not in the Running compaction list. This is ok to 
ignore.
-    }
+    long now = System.currentTimeMillis();
+    this.deadCompactions.forEach((eci, startTime) -> {
+      if ((now - startTime) > threshold) {
+        // Compaction believed to be dead for two cycles. Fail it.
+        try {
+          log.warn(
+              "Failing compaction {} which is believed to be dead. Last seen 
at {} and not seen since.",
+              eci, startTime);
+          coordinator.compactionFailed(tabletCompactions);
+          this.deadCompactions.remove(eci);
+        } catch (UnknownCompactionIdException e) {
+          // One or more Ids was not in the Running compaction list. This is 
ok to ignore.
+        }
+      }
+    });

Review comment:
       I applied this change in 95e8fec149722ac3f205734103223e3f42b33438

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -85,31 +103,55 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());

Review comment:
       I applied this change in 95e8fec149722ac3f205734103223e3f42b33438

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
+      this.deadCompactions.merge(ecid, 1L, Long::sum);
+    });
 
     // Everything left in tabletCompactions is no longer running anywhere and 
should be failed.
     // Its possible that a compaction committed while going through the steps 
above, if so then
     // that is ok and marking it failed will end up being a no-op.
+    Set<ExternalCompactionId> toFail =
+        this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 
2).map(e -> e.getKey())
+            .collect(Collectors.toCollection(TreeSet::new));
+    tabletCompactions.keySet().retainAll(toFail);
+    tabletCompactions.forEach((eci, v) -> {
+      log.warn("Compaction {} believed to be dead, failing it.", eci);

Review comment:
       I think compactor processes dying might only be normal in the situation 
where you are dynamically scaling and the scaling mechanism has no way to 
gracefully stop the compactor process. In a static deployment, compactor 
processes should not be dying. I think `warn` might be appropriate as it's 
highlighting a failure. Curious what others have to say here.

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());

Review comment:
       I thought I had removed that. Doh! removed in 
e95b6c46063bb5bfb9e1a252c8502a47818b2a49




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to