keith-turner commented on code in PR #4403:
URL: https://github.com/apache/accumulo/pull/4403#discussion_r1534854172


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1050,4 +1006,91 @@ private void cleanUpCompactors() {
     }
   }
 
+  public void cleanUpInternalState() {
+
+    // This method does the following:
+    //
+    // 1. Removes entries from RUNNING_CACHE that are not really running
+    // 2. Cancels running compactions for groups that are not in the current 
configuration
+    // 3. Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
+    // 4. Log groups with no compactors
+    // 5. Log compactors with no groups
+    // 6. Log groups with compactors that have not checked in
+
+    // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
+    // avoid removing things that are added while reading the metadata.
+    Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());
+
+    // grab the ids that are listed as running in the metadata table. It 
important that this is done
+    // after getting the snapshot.
+    Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
+
+    var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
+
+    // remove ids that are in the running set but not in the metadata table
+    idsToRemove.forEach(this::recordCompletion);
+
+    if (idsToRemove.size() > 0) {
+      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
+    }
+
+    // Get the set of groups being referenced in the current configuration
+    // Needs Dan's changes for this
+    Set<CompactorGroupId> groupsInConfiguration = new HashSet<>();
+
+    // Compaction jobs are created in the TabletGroupWatcher and added to the 
Coordinator
+    // via the addJobs method which adds the job to the CompactionJobQueues 
object.
+    Set<CompactorGroupId> groupsWithJobs = jobQueues.getQueueIds();
+
+    Set<CompactorGroupId> jobGroupsNotInConfiguration =
+        Sets.difference(groupsWithJobs, groupsInConfiguration);
+
+    if (jobGroupsNotInConfiguration != null && 
!jobGroupsNotInConfiguration.isEmpty()) {
+      RUNNING_CACHE.values().forEach(rc -> {
+        if 
(jobGroupsNotInConfiguration.contains(CompactorGroupId.of(rc.getGroupName()))) {
+          LOG.warn(
+              "External compaction {} running in group {} on compactor {},"
+                  + " but group not found in current configuration. Failing 
compaction...",
+              rc.getJob().getExternalCompactionId(), rc.getGroupName(), 
rc.getCompactorAddress());
+          cancelCompactionOnCompactor(rc.getCompactorAddress(),
+              rc.getJob().getExternalCompactionId());
+        }
+      });
+
+      // Remove groups not in configuration from TIME_COMPACTOR_LAST_CHECKED
+      LOG.debug("No longer tracking compactor check-in times for groups: {}",
+          jobGroupsNotInConfiguration);
+      jobGroupsNotInConfiguration.forEach(TIME_COMPACTOR_LAST_CHECKED::remove);
+    }
+
+    final Set<CompactorGroupId> runningCompactorGroups = new HashSet<>();
+    getRunningCompactors().keySet()
+        .forEach(group -> 
runningCompactorGroups.add(CompactorGroupId.of(group)));
+
+    Set<CompactorGroupId> groupsWithNoCompactors =
+        Sets.difference(groupsInConfiguration, runningCompactorGroups);
+    if (groupsWithNoCompactors != null && !groupsWithNoCompactors.isEmpty()) {
+      LOG.warn("The following groups have no running compactors: {}", 
groupsWithNoCompactors);
+    }
+
+    Set<CompactorGroupId> compactorsWithNoGroups =
+        Sets.difference(runningCompactorGroups, groupsInConfiguration);
+    if (compactorsWithNoGroups != null && !compactorsWithNoGroups.isEmpty()) {
+      LOG.warn(
+          "The following groups have running compactors, but are not in the 
current configuration: {}",
+          compactorsWithNoGroups);
+    }
+
+    long now = System.currentTimeMillis();
+    Map<String,Set<HostAndPort>> idleCompactors = getIdleCompactors();
+    TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> {

Review Comment:
   Actually we may want to loop over groupsInConfiguration at the top level.  
This would find any configured group that has queued work and running idle 
compactors that are not asking for work.  It would not include anything that 
happens to be in idle compactors that is not a configured group, I think that 
is covered by an earlier log message.
   
   ```
   for(var group : groupsInConfiguration){
     long lastCeckTime = TIME_COMPACTOR_LAST_CHECKED.getOrDefault(group, 0);
     if ((now - lastCheckTime) > getMissingCompactorWarningTime()
             && jobQueues.getQueuedJobs(groupName) > 0
             && idleCompactors.contains(group)){
        log.warn(...);
     }
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to