This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 55d162046c Remove GC candidates found to still be referenced (#3738)
55d162046c is described below

commit 55d162046c30d0316002410e4bf0fe9adba02a21
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Fri Sep 22 12:29:15 2023 -0400

    Remove GC candidates found to still be referenced (#3738)
    
    * Switch to using the GcCandidate class for Gc file reference deletions
    * Enables deletion mutations to only delete unique
      deletion candidate entries.
    
    * Adds tests to GarbageCollectorIT and GarbageCollectionTest to
       ensure that new candidates persist when old candidates are deleted.
    
    * Modifies the Garbage Collector to remove GcCandidates that map to
      file references currently used by tablets.
    
    * Removes the INUSE dir candidates from being added as they will not be
      recreated and candidate removal could cause empty subdirs to persist.
---
 .../org/apache/accumulo/core/conf/Property.java    |   5 +
 .../org/apache/accumulo/core/gc/GcCandidate.java   |  72 +++
 .../accumulo/core/metadata/schema/Ample.java       |  27 +-
 .../accumulo/server/metadata/RootGcCandidates.java |  23 +-
 .../accumulo/server/metadata/ServerAmpleImpl.java  |  23 +-
 .../accumulo/server/util/ListVolumesUsed.java      |   5 +-
 .../main/java/org/apache/accumulo/gc/GCRun.java    |  84 +++-
 .../accumulo/gc/GarbageCollectionAlgorithm.java    |  63 ++-
 .../accumulo/gc/GarbageCollectionEnvironment.java  |  24 +-
 .../apache/accumulo/gc/GarbageCollectionTest.java  | 502 ++++++++++++++-------
 .../accumulo/gc/SimpleGarbageCollectorTest.java    |  58 ++-
 .../test/functional/GarbageCollectorIT.java        | 185 ++++++++
 12 files changed, 808 insertions(+), 263 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b2338b4d27..e175a60173 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -891,6 +891,11 @@ public enum Property {
       "The listening port for the garbage collector's monitor service", 
"1.3.5"),
   GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
       "The number of threads used to delete RFiles and write-ahead logs", 
"1.3.5"),
+  @Experimental
+  GC_REMOVE_IN_USE_CANDIDATES("gc.remove.in.use.candidates", "false", 
PropertyType.BOOLEAN,
+      "GC will remove deletion candidates that are in-use from the metadata 
location. "
+          + "This is expected to increase the speed of subsequent GC runs",
+      "2.1.3"),
   @Deprecated(since = "2.1.1", forRemoval = true)
   GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN,
       "Do not use the Trash, even if it is configured.", "1.5.0"),
diff --git a/core/src/main/java/org/apache/accumulo/core/gc/GcCandidate.java 
b/core/src/main/java/org/apache/accumulo/core/gc/GcCandidate.java
new file mode 100644
index 0000000000..a483205b94
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/gc/GcCandidate.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.gc;
+
+import java.lang.Object;
+import java.util.Objects;
+
+public class GcCandidate implements Comparable<GcCandidate> {
+  private final long uid;
+  private final String path;
+
+  public GcCandidate(String path, long uid) {
+    this.path = path;
+    this.uid = uid;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public long getUid() {
+    return uid;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(path, uid);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj instanceof GcCandidate) {
+      GcCandidate candidate = (GcCandidate) obj;
+      return this.uid == candidate.getUid() && 
this.path.equals(candidate.getPath());
+    }
+    return false;
+  }
+
+  @Override
+  public int compareTo(GcCandidate candidate) {
+    var cmp = this.path.compareTo(candidate.getPath());
+    if (cmp == 0) {
+      return Long.compare(this.uid, candidate.getUid());
+    } else {
+      return cmp;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return path + ", UUID: " + uid;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 4ce1efe2c3..24327210b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
@@ -133,6 +134,24 @@ public interface Ample {
     EVENTUAL
   }
 
+  /**
+   * Enables status based processing of GcCandidates.
+   */
+  public enum GcCandidateType {
+    /**
+     * Candidates which have corresponding file references still present in 
tablet metadata.
+     */
+    INUSE,
+    /**
+     * Candidates that have no matching file references and can be removed 
from the system.
+     */
+    VALID,
+    /**
+     * Candidates that are malformed.
+     */
+    INVALID
+  }
+
   /**
    * Read a single tablets metadata. No checking is done for prev row, so it 
could differ. The
    * method will read the data using {@link ReadConsistency#IMMEDIATE}.
@@ -193,11 +212,15 @@ public interface Ample {
     throw new UnsupportedOperationException();
   }
 
-  default void deleteGcCandidates(DataLevel level, Collection<String> paths) {
+  /**
+   * Enum added to support unique candidate deletions in 2.1
+   */
+  default void deleteGcCandidates(DataLevel level, Collection<GcCandidate> 
candidates,
+      GcCandidateType type) {
     throw new UnsupportedOperationException();
   }
 
-  default Iterator<String> getGcCandidates(DataLevel level) {
+  default Iterator<GcCandidate> getGcCandidates(DataLevel level) {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java
index 95874fc2be..13d024686b 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootGcCandidates.java
@@ -20,12 +20,14 @@ package org.apache.accumulo.server.metadata;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.security.SecureRandom;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Stream;
 
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.hadoop.fs.Path;
 
@@ -78,20 +80,23 @@ public class RootGcCandidates {
         .add(ref.getFileName()));
   }
 
-  public void remove(Stream<String> refs) {
-    refs.map(Path::new).forEach(
-        path -> data.candidates.computeIfPresent(path.getParent().toString(), 
(key, values) -> {
-          values.remove(path.getName());
-          return values.isEmpty() ? null : values;
-        }));
+  public void remove(Stream<GcCandidate> refs) {
+    refs.map(GcCandidate::getPath).map(Path::new).forEach(path -> {
+      data.candidates.computeIfPresent(path.getParent().toString(), (key, 
values) -> {
+        values.remove(path.getName());
+        return values.isEmpty() ? null : values;
+      });
+    });
   }
 
-  public Stream<String> sortedStream() {
+  public Stream<GcCandidate> sortedStream() {
+    var uidGen = new SecureRandom();
     return data.candidates.entrySet().stream().flatMap(entry -> {
       String parent = entry.getKey();
       SortedSet<String> names = entry.getValue();
-      return names.stream().map(name -> new Path(parent, name));
-    }).map(Path::toString).sorted();
+      return names.stream()
+          .map(name -> new GcCandidate(parent + Path.SEPARATOR + name, 
uidGen.nextLong()));
+    }).sorted();
   }
 
   public String toJson() {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 03cdb4666d..65fa86b814 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
@@ -210,17 +211,22 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
   }
 
   @Override
-  public void deleteGcCandidates(DataLevel level, Collection<String> paths) {
+  public void deleteGcCandidates(DataLevel level, Collection<GcCandidate> 
candidates,
+      GcCandidateType type) {
 
     if (level == DataLevel.ROOT) {
-      mutateRootGcCandidates(rgcc -> rgcc.remove(paths.stream()));
+      if (type == GcCandidateType.INUSE) {
+        // Deletion of INUSE candidates is not supported in 2.1.x.
+        return;
+      }
+      mutateRootGcCandidates(rgcc -> rgcc.remove(candidates.stream()));
       return;
     }
 
     try (BatchWriter writer = context.createBatchWriter(level.metaTable())) {
-      for (String path : paths) {
-        Mutation m = new Mutation(DeletesSection.encodeRow(path));
-        m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
+      for (GcCandidate candidate : candidates) {
+        Mutation m = new 
Mutation(DeletesSection.encodeRow(candidate.getPath()));
+        m.putDelete(EMPTY_TEXT, EMPTY_TEXT, candidate.getUid());
         writer.addMutation(m);
       }
     } catch (MutationsRejectedException | TableNotFoundException e) {
@@ -229,7 +235,7 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
   }
 
   @Override
-  public Iterator<String> getGcCandidates(DataLevel level) {
+  public Iterator<GcCandidate> getGcCandidates(DataLevel level) {
     if (level == DataLevel.ROOT) {
       var zooReader = context.getZooReader();
       byte[] jsonBytes;
@@ -251,7 +257,10 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
       }
       scanner.setRange(range);
       return scanner.stream().filter(entry -> 
entry.getValue().equals(SkewedKeyValue.NAME))
-          .map(entry -> 
DeletesSection.decodeRow(entry.getKey().getRow().toString())).iterator();
+          .map(
+              entry -> new 
GcCandidate(DeletesSection.decodeRow(entry.getKey().getRow().toString()),
+                  entry.getKey().getTimestamp()))
+          .iterator();
     } else {
       throw new IllegalArgumentException();
     }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 08f7a9eab4..060b87d2ca 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
@@ -78,9 +79,9 @@ public class ListVolumesUsed {
         + " deletes section (volume replacement occurs at deletion time)");
     volumes.clear();
 
-    Iterator<String> delPaths = context.getAmple().getGcCandidates(level);
+    Iterator<GcCandidate> delPaths = context.getAmple().getGcCandidates(level);
     while (delPaths.hasNext()) {
-      volumes.add(getTableURI(delPaths.next()));
+      volumes.add(getTableURI(delPaths.next().getPath()));
     }
     for (String volume : volumes) {
       System.out.println("\tVolume : " + volume);
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index 2aa448c5ed..6a1b9e5962 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -26,6 +26,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,6 +49,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.gc.Reference;
 import org.apache.accumulo.core.gc.ReferenceDirectory;
 import org.apache.accumulo.core.gc.ReferenceFile;
@@ -58,6 +60,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.ValidationUtil;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.Ample.GcCandidateType;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
@@ -106,21 +109,45 @@ public class GCRun implements 
GarbageCollectionEnvironment {
   }
 
   @Override
-  public Iterator<String> getCandidates() {
+  public Iterator<GcCandidate> getCandidates() {
     return context.getAmple().getGcCandidates(level);
   }
 
+  /**
+   * Removes gcCandidates from the metadata location depending on type.
+   *
+   * @param gcCandidates Collection of deletion reference candidates to remove.
+   * @param type type of deletion reference candidates.
+   */
   @Override
-  public List<String> readCandidatesThatFitInMemory(Iterator<String> 
candidates) {
+  public void deleteGcCandidates(Collection<GcCandidate> gcCandidates, 
GcCandidateType type) {
+    if (inSafeMode()) {
+      System.out.println("SAFEMODE: There are " + gcCandidates.size()
+          + " reference file gcCandidates entries marked for deletion from " + 
level + " of type: "
+          + type + ".\n          Examine the log files to identify them.\n");
+      log.info("SAFEMODE: Listing all ref file gcCandidates for deletion");
+      for (GcCandidate gcCandidate : gcCandidates) {
+        log.info("SAFEMODE: {}", gcCandidate);
+      }
+      log.info("SAFEMODE: End reference candidates for deletion");
+      return;
+    }
+
+    log.info("Attempting to delete gcCandidates of type {} from metadata", 
type);
+    context.getAmple().deleteGcCandidates(level, gcCandidates, type);
+  }
+
+  @Override
+  public List<GcCandidate> readCandidatesThatFitInMemory(Iterator<GcCandidate> 
candidates) {
     long candidateLength = 0;
     // Converting the bytes to approximate number of characters for batch size.
     long candidateBatchSize = getCandidateBatchSize() / 2;
 
-    List<String> candidatesBatch = new ArrayList<>();
+    List<GcCandidate> candidatesBatch = new ArrayList<>();
 
     while (candidates.hasNext()) {
-      String candidate = candidates.next();
-      candidateLength += candidate.length();
+      GcCandidate candidate = candidates.next();
+      candidateLength += candidate.getPath().length();
       candidatesBatch.add(candidate);
       if (candidateLength > candidateBatchSize) {
         log.info("Candidate batch of size {} has exceeded the threshold. 
Attempting to delete "
@@ -238,7 +265,7 @@ public class GCRun implements GarbageCollectionEnvironment {
   }
 
   @Override
-  public void deleteConfirmedCandidates(SortedMap<String,String> 
confirmedDeletes)
+  public void deleteConfirmedCandidates(SortedMap<String,GcCandidate> 
confirmedDeletes)
       throws TableNotFoundException {
     final VolumeManager fs = context.getVolumeManager();
     var metadataLocation = level == Ample.DataLevel.ROOT
@@ -249,14 +276,14 @@ public class GCRun implements 
GarbageCollectionEnvironment {
           + " data file candidates marked for deletion in " + metadataLocation 
+ ".\n"
           + "          Examine the log files to identify them.\n");
       log.info("{} SAFEMODE: Listing all data file candidates for deletion", 
fileActionPrefix);
-      for (String s : confirmedDeletes.values()) {
-        log.info("{} SAFEMODE: {}", fileActionPrefix, s);
+      for (GcCandidate candidate : confirmedDeletes.values()) {
+        log.info("{} SAFEMODE: {}", fileActionPrefix, candidate);
       }
       log.info("SAFEMODE: End candidates for deletion");
       return;
     }
 
-    List<String> processedDeletes = Collections.synchronizedList(new 
ArrayList<>());
+    List<GcCandidate> processedDeletes = Collections.synchronizedList(new 
ArrayList<>());
 
     minimizeDeletes(confirmedDeletes, processedDeletes, fs, log);
 
@@ -265,7 +292,7 @@ public class GCRun implements GarbageCollectionEnvironment {
 
     final List<Pair<Path,Path>> replacements = context.getVolumeReplacements();
 
-    for (final String delete : confirmedDeletes.values()) {
+    for (final GcCandidate delete : confirmedDeletes.values()) {
 
       Runnable deleteTask = () -> {
         boolean removeFlag = false;
@@ -273,7 +300,7 @@ public class GCRun implements GarbageCollectionEnvironment {
         try {
           Path fullPath;
           Path switchedDelete =
-              VolumeUtil.switchVolume(delete, VolumeManager.FileType.TABLE, 
replacements);
+              VolumeUtil.switchVolume(delete.getPath(), 
VolumeManager.FileType.TABLE, replacements);
           if (switchedDelete != null) {
             // actually replacing the volumes in the metadata table would be 
tricky because the
             // entries would be different rows. So it could not be
@@ -283,10 +310,10 @@ public class GCRun implements 
GarbageCollectionEnvironment {
             // uses suffixes to compare delete entries, there is no danger
             // of deleting something that should not be deleted. Must not 
change value of delete
             // variable because that's what's stored in metadata table.
-            log.debug("Volume replaced {} -> {}", delete, switchedDelete);
+            log.debug("Volume replaced {} -> {}", delete.getPath(), 
switchedDelete);
             fullPath = ValidationUtil.validate(switchedDelete);
           } else {
-            fullPath = new Path(ValidationUtil.validate(delete));
+            fullPath = new Path(ValidationUtil.validate(delete.getPath()));
           }
 
           for (Path pathToDel : GcVolumeUtil.expandAllVolumesUri(fs, 
fullPath)) {
@@ -320,7 +347,8 @@ public class GCRun implements GarbageCollectionEnvironment {
                   }
                 }
               } else {
-                log.warn("{} Invalid file path format: {}", fileActionPrefix, 
delete);
+                log.warn("{} Delete failed due to invalid file path format: 
{}", fileActionPrefix,
+                    delete.getPath());
               }
             }
           }
@@ -348,7 +376,7 @@ public class GCRun implements GarbageCollectionEnvironment {
       log.error("{}", e1.getMessage(), e1);
     }
 
-    context.getAmple().deleteGcCandidates(level, processedDeletes);
+    deleteGcCandidates(processedDeletes, GcCandidateType.VALID);
   }
 
   @Override
@@ -409,21 +437,21 @@ public class GCRun implements 
GarbageCollectionEnvironment {
   }
 
   @VisibleForTesting
-  static void minimizeDeletes(SortedMap<String,String> confirmedDeletes,
-      List<String> processedDeletes, VolumeManager fs, Logger logger) {
+  static void minimizeDeletes(SortedMap<String,GcCandidate> confirmedDeletes,
+      List<GcCandidate> processedDeletes, VolumeManager fs, Logger logger) {
     Set<Path> seenVolumes = new HashSet<>();
 
     // when deleting a dir and all files in that dir, only need to delete the 
dir.
     // The dir will sort right before the files... so remove the files in this 
case
     // to minimize namenode ops
-    Iterator<Map.Entry<String,String>> cdIter = 
confirmedDeletes.entrySet().iterator();
+    Iterator<Map.Entry<String,GcCandidate>> cdIter = 
confirmedDeletes.entrySet().iterator();
 
     String lastDirRel = null;
     Path lastDirAbs = null;
     while (cdIter.hasNext()) {
-      Map.Entry<String,String> entry = cdIter.next();
+      Map.Entry<String,GcCandidate> entry = cdIter.next();
       String relPath = entry.getKey();
-      Path absPath = new Path(entry.getValue());
+      Path absPath = new Path(entry.getValue().getPath());
 
       if (SimpleGarbageCollector.isDir(relPath)) {
         lastDirRel = relPath;
@@ -450,8 +478,8 @@ public class GCRun implements GarbageCollectionEnvironment {
           }
 
           if (sameVol) {
-            logger.info("{} Ignoring {} because {} exist", fileActionPrefix, 
entry.getValue(),
-                lastDirAbs);
+            logger.info("{} Ignoring {} because {} exist", fileActionPrefix,
+                entry.getValue().getPath(), lastDirAbs);
             processedDeletes.add(entry.getValue());
             cdIter.remove();
           }
@@ -466,12 +494,22 @@ public class GCRun implements 
GarbageCollectionEnvironment {
   /**
    * Checks if safemode is set - files will not be deleted.
    *
-   * @return number of delete threads
+   * @return value of {@link Property#GC_SAFEMODE}
    */
   boolean inSafeMode() {
     return context.getConfiguration().getBoolean(Property.GC_SAFEMODE);
   }
 
+  /**
+   * Checks if InUse Candidates can be removed.
+   *
+   * @return value of {@link Property#GC_REMOVE_IN_USE_CANDIDATES}
+   */
+  @Override
+  public boolean canRemoveInUseCandidates() {
+    return 
context.getConfiguration().getBoolean(Property.GC_REMOVE_IN_USE_CANDIDATES);
+  }
+
   /**
    * Moves a file to trash. If this garbage collector is not using trash, this 
method returns false
    * and leaves the file alone. If the file is missing, this method returns 
false as opposed to
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
index 379f736d36..cc77197a41 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
@@ -22,6 +22,7 @@ import static java.util.Arrays.stream;
 import static java.util.function.Predicate.not;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -37,8 +38,10 @@ import java.util.stream.Stream;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.gc.Reference;
 import org.apache.accumulo.core.gc.ReferenceDirectory;
+import org.apache.accumulo.core.metadata.schema.Ample.GcCandidateType;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
@@ -121,13 +124,13 @@ public class GarbageCollectionAlgorithm {
     return relPath;
   }
 
-  private SortedMap<String,String> makeRelative(Collection<String> candidates) 
{
-    SortedMap<String,String> ret = new TreeMap<>();
+  private SortedMap<String,GcCandidate> makeRelative(Collection<GcCandidate> 
candidates) {
+    SortedMap<String,GcCandidate> ret = new TreeMap<>();
 
-    for (String candidate : candidates) {
+    for (GcCandidate candidate : candidates) {
       String relPath;
       try {
-        relPath = makeRelative(candidate, 0);
+        relPath = makeRelative(candidate.getPath(), 0);
       } catch (IllegalArgumentException iae) {
         log.warn("Ignoring invalid deletion candidate {}", candidate);
         continue;
@@ -139,8 +142,9 @@ public class GarbageCollectionAlgorithm {
   }
 
   private void removeCandidatesInUse(GarbageCollectionEnvironment gce,
-      SortedMap<String,String> candidateMap) throws InterruptedException {
+      SortedMap<String,GcCandidate> candidateMap) throws InterruptedException {
 
+    List<GcCandidate> inUseCandidates = new ArrayList<>();
     Set<TableId> tableIdsBefore = gce.getCandidateTableIDs();
     Set<TableId> tableIdsSeen = new HashSet<>();
     Iterator<Reference> iter = gce.getReferences().iterator();
@@ -156,8 +160,11 @@ public class GarbageCollectionAlgorithm {
 
         dir = makeRelative(dir, 2);
 
-        if (candidateMap.remove(dir) != null) {
-          log.debug("Candidate was still in use: {}", dir);
+        GcCandidate gcTemp = candidateMap.remove(dir);
+        if (gcTemp != null) {
+          log.debug("Directory Candidate was still in use by dir ref: {}", 
dir);
+          // Intentionally not adding dir candidates to inUseCandidates as 
they are only added once.
+          // If dir candidates are deleted, due to being in use, nothing will 
add them again.
         }
       } else {
         String reference = ref.getMetadataEntry();
@@ -173,23 +180,31 @@ public class GarbageCollectionAlgorithm {
 
         // WARNING: This line is EXTREMELY IMPORTANT.
         // You MUST REMOVE candidates that are still in use
-        if (candidateMap.remove(relativePath) != null) {
-          log.debug("Candidate was still in use: {}", relativePath);
+        GcCandidate gcTemp = candidateMap.remove(relativePath);
+        if (gcTemp != null) {
+          log.debug("File Candidate was still in use: {}", relativePath);
+          inUseCandidates.add(gcTemp);
         }
 
         String dir = relativePath.substring(0, relativePath.lastIndexOf('/'));
-        if (candidateMap.remove(dir) != null) {
-          log.debug("Candidate was still in use: {}", relativePath);
+        GcCandidate gcT = candidateMap.remove(dir);
+        if (gcT != null) {
+          log.debug("Directory Candidate was still in use by file ref: {}", 
relativePath);
+          // Intentionally not adding dir candidates to inUseCandidates as 
they are only added once.
+          // If dir candidates are deleted, due to being in use, nothing will 
add them again.
         }
       }
     }
     Set<TableId> tableIdsAfter = gce.getCandidateTableIDs();
     ensureAllTablesChecked(Collections.unmodifiableSet(tableIdsBefore),
         Collections.unmodifiableSet(tableIdsSeen), 
Collections.unmodifiableSet(tableIdsAfter));
+    if (gce.canRemoveInUseCandidates()) {
+      gce.deleteGcCandidates(inUseCandidates, GcCandidateType.INUSE);
+    }
   }
 
   private long removeBlipCandidates(GarbageCollectionEnvironment gce,
-      SortedMap<String,String> candidateMap) throws TableNotFoundException {
+      SortedMap<String,GcCandidate> candidateMap) throws 
TableNotFoundException {
     long blipCount = 0;
     boolean checkForBulkProcessingFiles = 
candidateMap.keySet().stream().anyMatch(
         relativePath -> 
relativePath.toLowerCase(Locale.ENGLISH).contains(Constants.BULK_PREFIX));
@@ -271,20 +286,20 @@ public class GarbageCollectionAlgorithm {
   }
 
   protected void confirmDeletesFromReplication(GarbageCollectionEnvironment 
gce,
-      SortedMap<String,String> candidateMap) {
+      SortedMap<String,GcCandidate> candidateMap) {
     var replicationNeededIterator = gce.getReplicationNeededIterator();
     var candidateMapIterator = candidateMap.entrySet().iterator();
 
     PeekingIterator<Entry<String,Status>> pendingReplication =
         Iterators.peekingIterator(replicationNeededIterator);
-    PeekingIterator<Entry<String,String>> candidates =
+    PeekingIterator<Entry<String,GcCandidate>> candidates =
         Iterators.peekingIterator(candidateMapIterator);
     while (pendingReplication.hasNext() && candidates.hasNext()) {
       Entry<String,Status> pendingReplica = pendingReplication.peek();
-      Entry<String,String> candidate = candidates.peek();
+      Entry<String,GcCandidate> candidate = candidates.peek();
 
       String filePendingReplication = pendingReplica.getKey();
-      String fullPathCandidate = candidate.getValue();
+      String fullPathCandidate = candidate.getValue().getPath();
 
       int comparison = filePendingReplication.compareTo(fullPathCandidate);
       if (comparison < 0) {
@@ -309,7 +324,7 @@ public class GarbageCollectionAlgorithm {
   }
 
   private void cleanUpDeletedTableDirs(GarbageCollectionEnvironment gce,
-      SortedMap<String,String> candidateMap) throws InterruptedException, 
IOException {
+      SortedMap<String,GcCandidate> candidateMap) throws InterruptedException, 
IOException {
     HashSet<TableId> tableIdsWithDeletes = new HashSet<>();
 
     // find the table ids that had dirs deleted
@@ -334,7 +349,8 @@ public class GarbageCollectionAlgorithm {
   }
 
   private long confirmDeletesTrace(GarbageCollectionEnvironment gce,
-      SortedMap<String,String> candidateMap) throws InterruptedException, 
TableNotFoundException {
+      SortedMap<String,GcCandidate> candidateMap)
+      throws InterruptedException, TableNotFoundException {
     long blips = 0;
     Span confirmDeletesSpan = TraceUtil.startSpan(this.getClass(), 
"confirmDeletes");
     try (Scope scope = confirmDeletesSpan.makeCurrent()) {
@@ -351,7 +367,7 @@ public class GarbageCollectionAlgorithm {
   }
 
   private void deleteConfirmedCandidates(GarbageCollectionEnvironment gce,
-      SortedMap<String,String> candidateMap)
+      SortedMap<String,GcCandidate> candidateMap)
       throws InterruptedException, IOException, TableNotFoundException {
     Span deleteSpan = TraceUtil.startSpan(this.getClass(), "deleteFiles");
     try (Scope deleteScope = deleteSpan.makeCurrent()) {
@@ -369,11 +385,11 @@ public class GarbageCollectionAlgorithm {
   public long collect(GarbageCollectionEnvironment gce)
       throws InterruptedException, TableNotFoundException, IOException {
 
-    Iterator<String> candidatesIter = gce.getCandidates();
+    Iterator<GcCandidate> candidatesIter = gce.getCandidates();
     long totalBlips = 0;
 
     while (candidatesIter.hasNext()) {
-      List<String> batchOfCandidates;
+      List<GcCandidate> batchOfCandidates;
       Span candidatesSpan = TraceUtil.startSpan(this.getClass(), 
"getCandidates");
       try (Scope candidatesScope = candidatesSpan.makeCurrent()) {
         batchOfCandidates = gce.readCandidatesThatFitInMemory(candidatesIter);
@@ -391,13 +407,13 @@ public class GarbageCollectionAlgorithm {
   /**
    * Given a sub-list of possible deletion candidates, process and remove 
valid deletion candidates.
    */
-  private long deleteBatch(GarbageCollectionEnvironment gce, List<String> 
currentBatch)
+  private long deleteBatch(GarbageCollectionEnvironment gce, List<GcCandidate> 
currentBatch)
       throws InterruptedException, TableNotFoundException, IOException {
 
     long origSize = currentBatch.size();
     gce.incrementCandidatesStat(origSize);
 
-    SortedMap<String,String> candidateMap = makeRelative(currentBatch);
+    SortedMap<String,GcCandidate> candidateMap = makeRelative(currentBatch);
 
     long blips = confirmDeletesTrace(gce, candidateMap);
     gce.incrementInUseStat(origSize - candidateMap.size());
@@ -406,5 +422,4 @@ public class GarbageCollectionAlgorithm {
 
     return blips;
   }
-
 }
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
index b99f1764c3..96aae86709 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.gc;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -28,11 +29,14 @@ import java.util.SortedMap;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.gc.Reference;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.Ample.GcCandidateType;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
@@ -45,7 +49,14 @@ public interface GarbageCollectionEnvironment {
    *
    * @return an iterator referencing a List containing deletion candidates
    */
-  Iterator<String> getCandidates() throws TableNotFoundException;
+  Iterator<GcCandidate> getCandidates() throws TableNotFoundException;
+
+  /**
+   * Used for determining if deletion of InUse candidates is enabled.
+   *
+   * @return value of {@link Property#GC_REMOVE_IN_USE_CANDIDATES}
+   */
+  boolean canRemoveInUseCandidates();
 
   /**
    * Given an iterator to a deletion candidate list, return a sub-list of 
candidates which fit
@@ -54,7 +65,7 @@ public interface GarbageCollectionEnvironment {
    * @param candidatesIter iterator referencing a List of possible deletion 
candidates
    * @return a List of possible deletion candidates
    */
-  List<String> readCandidatesThatFitInMemory(Iterator<String> candidatesIter);
+  List<GcCandidate> readCandidatesThatFitInMemory(Iterator<GcCandidate> 
candidatesIter);
 
   /**
    * Fetch a list of paths for all bulk loads in progress (blip) from a given 
table,
@@ -98,9 +109,16 @@ public interface GarbageCollectionEnvironment {
    *
    * @param candidateMap A Map from relative path to absolute path for files 
to be deleted.
    */
-  void deleteConfirmedCandidates(SortedMap<String,String> candidateMap)
+  void deleteConfirmedCandidates(SortedMap<String,GcCandidate> candidateMap)
       throws TableNotFoundException;
 
+  /**
+   * Delete in-use reference candidates based on property settings
+   *
+   * @param GcCandidates Collection of deletion reference candidates to remove.
+   */
+  void deleteGcCandidates(Collection<GcCandidate> GcCandidates, 
GcCandidateType type);
+
   /**
    * Delete a table's directory if it is empty.
    *
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
index 425508a925..0536bda699 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
@@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,6 +39,7 @@ import java.util.stream.Stream;
 
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.gc.Reference;
 import org.apache.accumulo.core.gc.ReferenceDirectory;
 import org.apache.accumulo.core.gc.ReferenceFile;
@@ -45,20 +47,25 @@ import 
org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.GcCandidateType;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.junit.jupiter.api.Test;
 
 public class GarbageCollectionTest {
 
   static class TestGCE implements GarbageCollectionEnvironment {
-    TreeSet<String> candidates = new TreeSet<>();
+    TreeSet<GcCandidate> candidates = new TreeSet<>();
+    TreeMap<GcCandidate,GcCandidateType> deletedCandidates = new TreeMap<>();
     ArrayList<String> blips = new ArrayList<>();
     Map<String,Reference> references = new TreeMap<>();
     HashSet<TableId> tableIds = new HashSet<>();
 
-    ArrayList<String> deletes = new ArrayList<>();
+    ArrayList<GcCandidate> deletes = new ArrayList<>();
     ArrayList<TableId> tablesDirsToDelete = new ArrayList<>();
     TreeMap<String,Status> filesToReplicate = new TreeMap<>();
+    boolean deleteInUseRefs = false;
+
+    private long timestamp = 0L;
 
     private final Ample.DataLevel level;
 
@@ -70,14 +77,30 @@ public class GarbageCollectionTest {
       this.level = Ample.DataLevel.USER;
     }
 
+    public GcCandidate addCandidate(String s) {
+      var candidate = new GcCandidate(s, timestamp);
+      this.candidates.add(candidate);
+      this.timestamp = timestamp + 1;
+      return candidate;
+    }
+
+    public Long getTimestamp() {
+      return timestamp;
+    }
+
     @Override
-    public Iterator<String> getCandidates() throws TableNotFoundException {
+    public Iterator<GcCandidate> getCandidates() throws TableNotFoundException 
{
       return List.copyOf(candidates).iterator();
     }
 
     @Override
-    public List<String> readCandidatesThatFitInMemory(Iterator<String> 
candidatesIter) {
-      List<String> candidatesBatch = new ArrayList<>();
+    public boolean canRemoveInUseCandidates() {
+      return deleteInUseRefs;
+    }
+
+    @Override
+    public List<GcCandidate> 
readCandidatesThatFitInMemory(Iterator<GcCandidate> candidatesIter) {
+      List<GcCandidate> candidatesBatch = new ArrayList<>();
       while (candidatesIter.hasNext() && candidatesBatch.size() < 3) {
         candidatesBatch.add(candidatesIter.next());
       }
@@ -94,6 +117,16 @@ public class GarbageCollectionTest {
       return references.values().stream();
     }
 
+    @Override
+    public void deleteGcCandidates(Collection<GcCandidate> refCandidates, 
GcCandidateType type) {
+      // Mimic ServerAmpleImpl behavior for root InUse Candidates
+      if (type.equals(GcCandidateType.INUSE) && 
this.level.equals(Ample.DataLevel.ROOT)) {
+        return;
+      }
+      refCandidates.forEach(gcCandidate -> deletedCandidates.put(gcCandidate, 
type));
+      this.candidates.removeAll(refCandidates);
+    }
+
     @Override
     public Map<TableId,TableState> getTableIDs() {
       HashMap<TableId,TableState> results = new HashMap<>();
@@ -102,7 +135,7 @@ public class GarbageCollectionTest {
     }
 
     @Override
-    public void deleteConfirmedCandidates(SortedMap<String,String> 
candidateMap) {
+    public void deleteConfirmedCandidates(SortedMap<String,GcCandidate> 
candidateMap) {
       deletes.addAll(candidateMap.values());
       this.candidates.removeAll(candidateMap.values());
     }
@@ -183,14 +216,28 @@ public class GarbageCollectionTest {
     }
   }
 
-  private void assertRemoved(TestGCE gce, String... refs) {
-    for (String ref : refs) {
-      assertTrue(gce.deletes.remove(ref));
+  private void assertRemoved(TestGCE gce, GcCandidate... candidates) {
+    for (GcCandidate candidate : candidates) {
+      assertTrue(gce.deletes.remove(candidate));
     }
 
     assertEquals(0, gce.deletes.size(), "Deletes not empty: " + gce.deletes);
   }
 
+  private void assertNoCandidatesRemoved(TestGCE gce) {
+    assertEquals(0, gce.deletedCandidates.size(),
+        "Deleted Candidates not empty: " + gce.deleteInUseRefs);
+  }
+
+  private void assertCandidateRemoved(TestGCE gce, GcCandidateType 
gcCandidateType,
+      GcCandidate... gcCandidates) {
+    for (GcCandidate gcCandidate : gcCandidates) {
+      assertEquals(gcCandidateType, gce.deletedCandidates.remove(gcCandidate));
+    }
+    assertEquals(0, gce.deletedCandidates.size(),
+        "Deleted Candidates not empty: " + gce.deleteInUseRefs);
+  }
+
   // This test was created to help track down a 
ConcurrentModificationException error that was
   // occurring with the unit tests once the GC was refactored to use a single 
iterator for the
   // collect process. This was a minimal test case that would cause the 
exception to occur.
@@ -198,10 +245,10 @@ public class GarbageCollectionTest {
   public void minimalDelete() throws Exception {
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t0/F006.rf");
+    gce.addCandidate("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    var candidate = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/6/t0/F006.rf");
 
     gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
     gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
@@ -210,16 +257,16 @@ public class GarbageCollectionTest {
     GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
     gca.collect(gce);
 
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+    assertRemoved(gce, candidate);
   }
 
   @Test
   public void testBasic() throws Exception {
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+    var candOne = 
gce.addCandidate("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+    var candTwo = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
 
     gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
     gce.addFileReference("4", null, 
"hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
@@ -235,7 +282,7 @@ public class GarbageCollectionTest {
     // candidates, and assert that it's gone
     gce.removeFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+    assertRemoved(gce, candOne);
 
     // Removing a reference to a file that wasn't in the candidates should do 
nothing
     gce.removeFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F002.rf");
@@ -245,14 +292,13 @@ public class GarbageCollectionTest {
     // Remove the reference to a file in the candidates should cause it to be 
removed
     gce.removeFileReference("4", null, 
"hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    assertRemoved(gce, candTwo);
 
-    // Adding more candidates which do no have references should be removed
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
+    // Adding more candidates which do not have references should be removed
+    var candThree = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf");
+    var candFour = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf",
-        "hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
+    assertRemoved(gce, candThree, candFour);
 
   }
 
@@ -265,47 +311,16 @@ public class GarbageCollectionTest {
   public void testBasic2() throws Exception {
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
-
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/5/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t1/F005.rf");
-
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/6/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/7/t0/F005.rf");
-
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/7/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/7/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/8/t0/F005.rf");
+    GcCandidate[] toBeRemoved = new GcCandidate[20];
 
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/8/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/8/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/9/t0/F005.rf");
+    var candOne = 
gce.addCandidate("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+    var candTwo = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
 
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/9/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/9/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/10/t0/F005.rf");
-
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/10/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/10/t0/F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/11/t0/F005.rf");
-
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/11/t0/F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/11/t0/F001.rf");
-
-    gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
-    gce.addFileReference("4", null, 
"hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
-    gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0//F002.rf");
-    gce.addFileReference("5", null, 
"hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
-
-    GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
-
-    gca.collect(gce);
+    int counter = 0;
     // items to be removed from candidates
-    String[] toBeRemoved = {"hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf",
+    String[] paths = {"hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf",
+
         "hdfs://foo:6000/accumulo/tables/5/t0/F000.rf",
         "hdfs://foo.com:6000/accumulo/tables/6/t1/F005.rf",
         "hdfs://foo:6000/accumulo/tables/6/t0/F000.rf",
@@ -325,13 +340,26 @@ public class GarbageCollectionTest {
         "hdfs://foo.com:6000/accumulo/tables/11/t0/F005.rf",
         "hdfs://foo:6000/accumulo/tables/11/t0/F000.rf",
         "hdfs://foo.com:6000/accumulo/tables/11/t0/F001.rf"};
+    for (String path : paths) {
+      toBeRemoved[counter] = gce.addCandidate(path);
+      counter++;
+    }
+
+    gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+    gce.addFileReference("4", null, 
"hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
+    gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0//F002.rf");
+    gce.addFileReference("5", null, 
"hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+
+    GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
+
+    gca.collect(gce);
     assertRemoved(gce, toBeRemoved);
 
     // Remove the reference to this flush file, run the GC which should not 
trim it from the
     // candidates, and assert that it's gone
     gce.removeFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+    assertRemoved(gce, candOne);
 
     // Removing a reference to a file that wasn't in the candidates should do 
nothing
     gce.removeFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F002.rf");
@@ -341,14 +369,13 @@ public class GarbageCollectionTest {
     // Remove the reference to a file in the candidates should cause it to be 
removed
     gce.removeFileReference("4", null, 
"hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    assertRemoved(gce, candTwo);
 
     // Adding more candidates which do no have references should be removed
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
+    var candThree = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf");
+    var candFour = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf",
-        "hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
+    assertRemoved(gce, candThree, candFour);
   }
 
   /**
@@ -358,10 +385,10 @@ public class GarbageCollectionTest {
   public void emptyPathsTest() throws Exception {
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/4//t0//F000.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4//t0//F001.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5//t0//F005.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo//tables//6/t0/F006.rf");
+    gce.addCandidate("hdfs://foo:6000/accumulo/tables/4//t0//F000.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4//t0//F001.rf");
+    var candidate = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/5//t0//F005.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo//tables//6/t0/F006.rf");
 
     gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4//t0//F000.rf");
     gce.addFileReference("4", null, 
"hdfs://foo.com:6000/accumulo/tables/4//t0//F001.rf");
@@ -370,16 +397,16 @@ public class GarbageCollectionTest {
     GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
     gca.collect(gce);
 
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/5//t0//F005.rf");
+    assertRemoved(gce, candidate);
   }
 
   @Test
   public void testRelative() throws Exception {
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("/4/t0/F000.rf");
-    gce.candidates.add("/4/t0/F002.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    var candOne = gce.addCandidate("/4/t0/F000.rf");
+    var candTwo = gce.addCandidate("/4/t0/F002.rf");
+    var candThree = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
 
     gce.addFileReference("4", null, "/t0/F000.rf");
     gce.addFileReference("4", null, "/t0/F001.rf");
@@ -408,35 +435,35 @@ public class GarbageCollectionTest {
 
     gce.removeFileReference(refsToRemove.get(2)[0], null, 
refsToRemove.get(2)[1]);
     gca.collect(gce);
-    assertRemoved(gce, "/4/t0/F000.rf");
+    assertRemoved(gce, candOne);
 
     gce.removeFileReference("4", null, "/t0/F001.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+    assertRemoved(gce, candThree);
 
     // add absolute candidate for file that already has a relative candidate
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F002.rf");
+    var candFour = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/t0/F002.rf");
     gca.collect(gce);
     assertRemoved(gce);
 
     gce.removeFileReference("4", null, "/t0/F002.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F002.rf");
+    assertRemoved(gce, candFour);
 
     gca.collect(gce);
-    assertRemoved(gce, "/4/t0/F002.rf");
+    assertRemoved(gce, candTwo);
   }
 
   @Test
   public void testBlip() throws Exception {
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("/4/b-0");
-    gce.candidates.add("/4/b-0/F002.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/b-0/F001.rf");
-    gce.candidates.add("/5/b-0");
-    gce.candidates.add("/5/b-0/F002.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/b-0/F001.rf");
+    gce.addCandidate("/4/b-0");
+    gce.addCandidate("/4/b-0/F002.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/4/b-0/F001.rf");
+    gce.addCandidate("/5/b-0");
+    gce.addCandidate("/5/b-0/F002.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/5/b-0/F001.rf");
 
     gce.blips.add("/4/b-0");
     gce.blips.add("hdfs://foo.com:6000/accumulo/tables/5/b-0");
@@ -453,15 +480,15 @@ public class GarbageCollectionTest {
     // And we should lose all files in that blip and the blip directory itself 
-- relative and
     // absolute
     gca.collect(gce);
-    assertRemoved(gce, "/4/b-0", "/4/b-0/F002.rf",
-        "hdfs://foo.com:6000/accumulo/tables/4/b-0/F001.rf");
+    assertRemoved(gce, new GcCandidate("/4/b-0", 0L), new 
GcCandidate("/4/b-0/F002.rf", 1L),
+        new GcCandidate("hdfs://foo.com:6000/accumulo/tables/4/b-0/F001.rf", 
2L));
 
     gce.blips.remove("hdfs://foo.com:6000/accumulo/tables/5/b-0");
 
     // Same as above, we should lose relative and absolute for a relative or 
absolute blip
     gca.collect(gce);
-    assertRemoved(gce, "/5/b-0", "/5/b-0/F002.rf",
-        "hdfs://foo.com:6000/accumulo/tables/5/b-0/F001.rf");
+    assertRemoved(gce, new GcCandidate("/5/b-0", 3L), new 
GcCandidate("/5/b-0/F002.rf", 4L),
+        new GcCandidate("hdfs://foo.com:6000/accumulo/tables/5/b-0/F001.rf", 
5L));
 
     gca.collect(gce);
     assertRemoved(gce);
@@ -471,17 +498,17 @@ public class GarbageCollectionTest {
   public void testDirectories() throws Exception {
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("/4/t-0");
-    gce.candidates.add("/4/t-0/F002.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t-0");
-    gce.candidates.add("/6/t-0");
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/7/t-0/");
-    gce.candidates.add("/8/t-0");
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/9/t-0");
-    gce.candidates.add("/a/t-0");
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/b/t-0");
-    gce.candidates.add("/c/t-0");
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/d/t-0");
+    gce.addCandidate("/4/t-0");
+    gce.addCandidate("/4/t-0/F002.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/5/t-0");
+    gce.addCandidate("/6/t-0");
+    gce.addCandidate("hdfs://foo:6000/accumulo/tables/7/t-0/");
+    gce.addCandidate("/8/t-0");
+    gce.addCandidate("hdfs://foo:6000/accumulo/tables/9/t-0");
+    gce.addCandidate("/a/t-0");
+    gce.addCandidate("hdfs://foo:6000/accumulo/tables/b/t-0");
+    gce.addCandidate("/c/t-0");
+    gce.addCandidate("hdfs://foo:6000/accumulo/tables/d/t-0");
 
     gce.addDirReference("4", null, "t-0");
     gce.addDirReference("5", null, "t-0");
@@ -501,21 +528,22 @@ public class GarbageCollectionTest {
 
     // A directory reference does not preclude a candidate file beneath that 
directory from deletion
     gca.collect(gce);
-    assertRemoved(gce, "/4/t-0/F002.rf");
+    assertRemoved(gce, new GcCandidate("/4/t-0/F002.rf", 1L));
 
     // Removing the dir reference for a table will delete all tablet 
directories
     gce.removeDirReference("5", null);
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/5/t-0");
+    assertRemoved(gce, new 
GcCandidate("hdfs://foo.com:6000/accumulo/tables/5/t-0", 2L));
 
     gce.removeDirReference("4", null);
     gca.collect(gce);
-    assertRemoved(gce, "/4/t-0");
+    assertRemoved(gce, new GcCandidate("/4/t-0", 0L));
 
     gce.removeDirReference("6", null);
     gce.removeDirReference("7", null);
     gca.collect(gce);
-    assertRemoved(gce, "/6/t-0", "hdfs://foo:6000/accumulo/tables/7/t-0/");
+    assertRemoved(gce, new GcCandidate("/6/t-0", 3L),
+        new GcCandidate("hdfs://foo:6000/accumulo/tables/7/t-0/", 4L));
 
     gce.removeFileReference("8", "m", "/t-0/F00.rf");
     gce.removeFileReference("9", "m", "/t-0/F00.rf");
@@ -524,8 +552,10 @@ public class GarbageCollectionTest {
     gce.removeFileReference("e", "m", "../c/t-0/F00.rf");
     gce.removeFileReference("f", "m", "../d/t-0/F00.rf");
     gca.collect(gce);
-    assertRemoved(gce, "/8/t-0", "hdfs://foo:6000/accumulo/tables/9/t-0", 
"/a/t-0",
-        "hdfs://foo:6000/accumulo/tables/b/t-0", "/c/t-0", 
"hdfs://foo:6000/accumulo/tables/d/t-0");
+    assertRemoved(gce, new GcCandidate("/8/t-0", 5L),
+        new GcCandidate("hdfs://foo:6000/accumulo/tables/9/t-0", 6L), new 
GcCandidate("/a/t-0", 7L),
+        new GcCandidate("hdfs://foo:6000/accumulo/tables/b/t-0", 8L), new 
GcCandidate("/c/t-0", 9L),
+        new GcCandidate("hdfs://foo:6000/accumulo/tables/d/t-0", 10L));
 
     gca.collect(gce);
     assertRemoved(gce);
@@ -534,18 +564,19 @@ public class GarbageCollectionTest {
   @Test
   public void testCustomDirectories() throws Exception {
     TestGCE gce = new TestGCE();
-
-    gce.candidates.add("/4/t-0");
-    gce.candidates.add("/4/t-0/F002.rf");
-    gce.candidates.add("hdfs://foo.com:6000/user/foo/tables/5/t-0");
-    gce.candidates.add("/6/t-0");
-    gce.candidates.add("hdfs://foo:6000/user/foo/tables/7/t-0/");
-    gce.candidates.add("/8/t-0");
-    gce.candidates.add("hdfs://foo:6000/user/foo/tables/9/t-0");
-    gce.candidates.add("/a/t-0");
-    gce.candidates.add("hdfs://foo:6000/user/foo/tables/b/t-0");
-    gce.candidates.add("/c/t-0");
-    gce.candidates.add("hdfs://foo:6000/user/foo/tables/d/t-0");
+    Map<Integer,GcCandidate> candidates = new HashMap<>();
+
+    candidates.put(1, gce.addCandidate("/4/t-0"));
+    candidates.put(2, gce.addCandidate("/4/t-0/F002.rf"));
+    candidates.put(3, 
gce.addCandidate("hdfs://foo.com:6000/user/foo/tables/5/t-0"));
+    candidates.put(4, gce.addCandidate("/6/t-0"));
+    candidates.put(5, 
gce.addCandidate("hdfs://foo:6000/user/foo/tables/7/t-0/"));
+    candidates.put(6, gce.addCandidate("/8/t-0"));
+    candidates.put(7, 
gce.addCandidate("hdfs://foo:6000/user/foo/tables/9/t-0"));
+    candidates.put(8, gce.addCandidate("/a/t-0"));
+    candidates.put(9, 
gce.addCandidate("hdfs://foo:6000/user/foo/tables/b/t-0"));
+    candidates.put(10, gce.addCandidate("/c/t-0"));
+    candidates.put(11, 
gce.addCandidate("hdfs://foo:6000/user/foo/tables/d/t-0"));
 
     gce.addDirReference("4", null, "t-0");
     gce.addDirReference("5", null, "t-0");
@@ -565,23 +596,23 @@ public class GarbageCollectionTest {
 
     // A directory reference does not preclude a candidate file beneath that 
directory from deletion
     gca.collect(gce);
-    assertRemoved(gce, "/4/t-0/F002.rf");
+    assertRemoved(gce, candidates.get(2));
 
     // Removing the dir reference for a table will delete all tablet 
directories
     gce.removeDirReference("5", null);
     // but we need to add a file ref
     gce.addFileReference("8", "m", "/t-0/F00.rf");
     gca.collect(gce);
-    assertRemoved(gce, "hdfs://foo.com:6000/user/foo/tables/5/t-0");
+    assertRemoved(gce, candidates.get(3));
 
     gce.removeDirReference("4", null);
     gca.collect(gce);
-    assertRemoved(gce, "/4/t-0");
+    assertRemoved(gce, candidates.get(1));
 
     gce.removeDirReference("6", null);
     gce.removeDirReference("7", null);
     gca.collect(gce);
-    assertRemoved(gce, "/6/t-0", "hdfs://foo:6000/user/foo/tables/7/t-0/");
+    assertRemoved(gce, candidates.get(4), candidates.get(5));
 
     gce.removeFileReference("8", "m", "/t-0/F00.rf");
     gce.removeFileReference("9", "m", "/t-0/F00.rf");
@@ -590,8 +621,8 @@ public class GarbageCollectionTest {
     gce.removeFileReference("e", "m", "../c/t-0/F00.rf");
     gce.removeFileReference("f", "m", "../d/t-0/F00.rf");
     gca.collect(gce);
-    assertRemoved(gce, "/8/t-0", "hdfs://foo:6000/user/foo/tables/9/t-0", 
"/a/t-0",
-        "hdfs://foo:6000/user/foo/tables/b/t-0", "/c/t-0", 
"hdfs://foo:6000/user/foo/tables/d/t-0");
+    assertRemoved(gce, candidates.get(6), candidates.get(7), 
candidates.get(8), candidates.get(9),
+        candidates.get(10), candidates.get(11));
 
     gca.collect(gce);
     assertRemoved(gce);
@@ -600,7 +631,7 @@ public class GarbageCollectionTest {
   private void badRefTest(String ref) {
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("/4/t-0/F002.rf");
+    gce.addCandidate("/4/t-0/F002.rf");
 
     gce.addFileReference("4", "m", ref);
 
@@ -651,18 +682,18 @@ public class GarbageCollectionTest {
     GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
 
     TestGCE gce = new TestGCE();
-    gce.candidates.add("");
-    gce.candidates.add("A");
-    gce.candidates.add("/");
-    gce.candidates.add("//");
-    gce.candidates.add("///");
-    gce.candidates.add("////");
-    gce.candidates.add("/1/2/3/4");
-    gce.candidates.add("/a");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tbls/5/F00.rf");
-    gce.candidates.add("hdfs://foo.com:6000/");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/");
-    gce.candidates.add("hdfs://foo.com:6000/user/foo/tables/a/t-0/t-1/F00.rf");
+    gce.addCandidate("");
+    gce.addCandidate("A");
+    gce.addCandidate("/");
+    gce.addCandidate("//");
+    gce.addCandidate("///");
+    gce.addCandidate("////");
+    gce.addCandidate("/1/2/3/4");
+    gce.addCandidate("/a");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tbls/5/F00.rf");
+    gce.addCandidate("hdfs://foo.com:6000/");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/");
+    gce.addCandidate("hdfs://foo.com:6000/user/foo/tables/a/t-0/t-1/F00.rf");
 
     gca.collect(gce);
     System.out.println(gce.deletes);
@@ -675,18 +706,18 @@ public class GarbageCollectionTest {
 
     TestGCE gce = new TestGCE();
 
-    gce.candidates.add("/1636/default_tablet");
+    gce.addCandidate("/1636/default_tablet");
     gce.addDirReference("1636", null, "default_tablet");
     gca.collect(gce);
     assertRemoved(gce);
 
     gce.candidates.clear();
-    gce.candidates.add("/1636/default_tablet/someFile");
+    var tempCandidate = gce.addCandidate("/1636/default_tablet/someFile");
     gca.collect(gce);
-    assertRemoved(gce, "/1636/default_tablet/someFile");
+    assertRemoved(gce, tempCandidate);
 
     gce.addFileReference("1636", null, "/default_tablet/someFile");
-    gce.candidates.add("/1636/default_tablet/someFile");
+    gce.addCandidate("/1636/default_tablet/someFile");
     gca.collect(gce);
     assertRemoved(gce);
 
@@ -695,19 +726,19 @@ public class GarbageCollectionTest {
 
     gce.addFileReference("1636", null, "../9/default_tablet/someFile");
     gce.addDirReference("1636", null, "default_tablet");
-    gce.candidates.add("/9/default_tablet/someFile");
+    gce.addCandidate("/9/default_tablet/someFile");
     gca.collect(gce);
     assertRemoved(gce);
 
     // have an indirect file reference and a directory candidate
     gce.candidates.clear();
-    gce.candidates.add("/9/default_tablet");
+    gce.addCandidate("/9/default_tablet");
     gca.collect(gce);
     assertRemoved(gce);
 
     gce.candidates.clear();
-    gce.candidates.add("/9/default_tablet");
-    gce.candidates.add("/9/default_tablet/someFile");
+    gce.addCandidate("/9/default_tablet");
+    gce.addCandidate("/9/default_tablet/someFile");
     long blipCount = gca.collect(gce);
     assertRemoved(gce);
     assertEquals(0, blipCount);
@@ -715,7 +746,7 @@ public class GarbageCollectionTest {
     gce = new TestGCE();
 
     gce.blips.add("/1636/b-0001");
-    gce.candidates.add("/1636/b-0001/I0000");
+    gce.addCandidate("/1636/b-0001/I0000");
     blipCount = gca.collect(gce);
     assertRemoved(gce);
     assertEquals(1, blipCount);
@@ -727,11 +758,11 @@ public class GarbageCollectionTest {
     gce.blips.add("/1029/b-0003");
     gce.blips.add("/1000/b-1001");
     gce.blips.add("/1000/b-1002");
-    gce.candidates.add("/1029/b-0002/I0006");
-    gce.candidates.add("/1000/b-1002/I0007");
-    gce.candidates.add("/1000/t-0003/I0008");
+    gce.addCandidate("/1029/b-0002/I0006");
+    gce.addCandidate("/1000/b-1002/I0007");
+    var candidate = gce.addCandidate("/1000/t-0003/I0008");
     blipCount = gca.collect(gce);
-    assertRemoved(gce, "/1000/t-0003/I0008");
+    assertRemoved(gce, candidate);
     assertEquals(5, blipCount);
   }
 
@@ -743,11 +774,11 @@ public class GarbageCollectionTest {
 
     gce.tableIds.add(TableId.of("4"));
 
-    gce.candidates.add("/4/t-0");
-    gce.candidates.add("/4/t-0/F002.rf");
-    gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t-0");
-    gce.candidates.add("/6/t-0");
-    gce.candidates.add("hdfs://foo:6000/accumulo/tables/7/t-0/");
+    gce.addCandidate("/4/t-0");
+    gce.addCandidate("/4/t-0/F002.rf");
+    gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/5/t-0");
+    gce.addCandidate("/6/t-0");
+    gce.addCandidate("hdfs://foo:6000/accumulo/tables/7/t-0/");
 
     gce.addDirReference("4", null, "t-0");
     gce.addDirReference("7", null, "t-0");
@@ -760,7 +791,7 @@ public class GarbageCollectionTest {
 
     assertEquals(tids.size(), gce.tablesDirsToDelete.size());
     assertTrue(tids.containsAll(gce.tablesDirsToDelete));
-
+    assertNoCandidatesRemoved(gce);
   }
 
   @Test
@@ -769,8 +800,8 @@ public class GarbageCollectionTest {
 
     TestGCE gce = new TestGCE();
 
-    
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf");
-    
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
+    
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf");
+    
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
 
     Status status = 
Status.newBuilder().setClosed(true).setEnd(100).setBegin(100).build();
     
gce.filesToReplicate.put("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf",
 status);
@@ -788,8 +819,8 @@ public class GarbageCollectionTest {
 
     TestGCE gce = new TestGCE();
 
-    
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf");
-    
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
+    
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf");
+    var candidate = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
 
     // We replicated all of the data, but we might still write more data to 
the file
     Status status = 
Status.newBuilder().setClosed(false).setEnd(1000).setBegin(100).build();
@@ -799,7 +830,7 @@ public class GarbageCollectionTest {
 
     // We need to replicate that one file still, should not delete it.
     assertEquals(1, gce.deletes.size());
-    assertEquals("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf", 
gce.deletes.get(0));
+    assertEquals(candidate, gce.deletes.get(0));
   }
 
   @Test
@@ -808,8 +839,8 @@ public class GarbageCollectionTest {
 
     TestGCE gce = new TestGCE();
 
-    
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf");
-    
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
+    
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf");
+    var candidate = 
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
 
     // We replicated all of the data, but we might still write more data to 
the file
     @SuppressWarnings("deprecation")
@@ -821,7 +852,7 @@ public class GarbageCollectionTest {
 
     // We need to replicate that one file still, should not delete it.
     assertEquals(1, gce.deletes.size());
-    assertEquals("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf", 
gce.deletes.get(0));
+    assertEquals(candidate, gce.deletes.get(0));
   }
 
   @Test
@@ -830,8 +861,10 @@ public class GarbageCollectionTest {
 
     TestGCE gce = new TestGCE();
 
-    
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf");
-    
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
+    assertEquals(0, gce.deletes.size());
+
+    
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf");
+    
gce.addCandidate("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
 
     // Some file of unknown length has no replication yet (representative of 
the bulk-import case)
     Status status = 
Status.newBuilder().setInfiniteEnd(true).setBegin(0).setClosed(true).build();
@@ -841,7 +874,8 @@ public class GarbageCollectionTest {
 
     // We need to replicate that one file still, should not delete it.
     assertEquals(1, gce.deletes.size());
-    assertEquals("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf", 
gce.deletes.get(0));
+    assertEquals(new 
GcCandidate("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf", 1L),
+        gce.deletes.get(0));
   }
 
   @Test
@@ -850,7 +884,7 @@ public class GarbageCollectionTest {
 
     TestGCE gce = new TestGCE(Ample.DataLevel.USER);
 
-    gce.candidates.add("hdfs://foo.com:6000/user/foo/tables/a/t-0/F00.rf");
+    gce.addCandidate("hdfs://foo.com:6000/user/foo/tables/a/t-0/F00.rf");
 
     gce.addFileReference("a", null, 
"hdfs://foo.com:6000/user/foo/tables/a/t-0/F00.rf");
     gce.addFileReference("c", null, 
"hdfs://foo.com:6000/user/foo/tables/c/t-0/F00.rf");
@@ -863,6 +897,134 @@ public class GarbageCollectionTest {
         && msg.contains("Saw table IDs in ZK that were not in metadata 
table:"), msg);
   }
 
+  @Test
+  public void testDeletingInUseReferenceCandidates() throws Exception {
+    TestGCE gce = new TestGCE();
+
+    var candidate = gce.addCandidate("/4/t0/F000.rf");
+
+    gce.addFileReference("4", null, "/t0/F000.rf");
+    gce.addFileReference("4", null, "/t0/F001.rf");
+    gce.addFileReference("4", null, "/t0/F002.rf");
+    gce.addFileReference("4", null, "/t0/F003.rf");
+    gce.addFileReference("5", null, "../4/t0/F000.rf");
+    gce.addFileReference("9", null, "/t0/F003.rf");
+    gce.addFileReference("6", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+
+    GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
+    gce.deleteInUseRefs = false;
+    // All candidates currently have references
+    gca.collect(gce);
+    assertRemoved(gce);
+    assertNoCandidatesRemoved(gce);
+
+    // Enable InUseRefs to be removed if the file ref is found.
+    gce.deleteInUseRefs = true;
+    gca.collect(gce);
+    assertRemoved(gce);
+    assertCandidateRemoved(gce, GcCandidateType.INUSE, candidate);
+
+    var cand1 = gce.addCandidate("/9/t0/F003.rf");
+    var cand2 = gce.addCandidate("../144/t0/F003.rf");
+    gce.removeFileReference("9", null, "/t0/F003.rf");
+    gce.removeFileReference("4", null, "/t0/F003.rf");
+
+    gca.collect(gce);
+    assertNoCandidatesRemoved(gce);
+    // File references did not exist, so candidates are processed
+    assertRemoved(gce, cand1, cand2);
+  }
+
+  @Test
+  public void testDeletingRootInUseReferenceCandidates() throws Exception {
+    TestGCE gce = new TestGCE(Ample.DataLevel.ROOT);
+
+    GcCandidate[] toBeRemoved = new GcCandidate[4];
+
+    toBeRemoved[0] = gce.addCandidate("/+r/t0/F000.rf");
+    toBeRemoved[1] = gce.addCandidate("/+r/t0/F001.rf");
+    toBeRemoved[2] = gce.addCandidate("/+r/t0/F002.rf");
+    toBeRemoved[3] = gce.addCandidate("/+r/t0/F003.rf");
+
+    gce.addFileReference("+r", null, "/t0/F000.rf");
+    gce.addFileReference("+r", null, "/t0/F001.rf");
+    gce.addFileReference("+r", null, "/t0/F002.rf");
+    gce.addFileReference("+r", null, "/t0/F003.rf");
+    gce.addFileReference("+r", null, "../4/t0/F000.rf");
+    gce.addFileReference("+r", null, "/t0/F003.rf");
+    gce.addFileReference("+r", null, 
"hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+
+    GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
+    gce.deleteInUseRefs = false;
+    // No InUse Candidates should be removed.
+    gca.collect(gce);
+    assertRemoved(gce);
+    assertNoCandidatesRemoved(gce);
+
+    gce.deleteInUseRefs = true;
+    // Due to the gce Datalevel of ROOT, InUse candidate deletion is not 
supported regardless of
+    // property setting.
+    gca.collect(gce);
+    assertRemoved(gce);
+    assertNoCandidatesRemoved(gce);
+
+    gce.removeFileReference("+r", null, "/t0/F000.rf");
+    gce.removeFileReference("+r", null, "/t0/F001.rf");
+    gce.removeFileReference("+r", null, "/t0/F002.rf");
+    gce.removeFileReference("+r", null, "/t0/F003.rf");
+
+    // With file references deleted, the GC should now process the candidates
+    gca.collect(gce);
+    assertRemoved(gce, toBeRemoved);
+    assertNoCandidatesRemoved(gce);
+  }
+
+  @Test
+  public void testInUseDirReferenceCandidates() throws Exception {
+    TestGCE gce = new TestGCE();
+    GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
+
+    // Check expected starting state.
+    assertEquals(0, gce.candidates.size());
+
+    // Ensure that dir candidates still work
+    var candOne = gce.addCandidate("6/t-0");
+    var candTwo = gce.addCandidate("7/T-1");
+    gce.addDirReference("6", null, "t-0");
+
+    gca.collect(gce);
+    assertRemoved(gce, candTwo);
+    assertNoCandidatesRemoved(gce);
+    assertEquals(1, gce.candidates.size());
+
+    // Removing the dir reference causes the dir to be deleted.
+    gce.removeDirReference("6", null);
+
+    gca.collect(gce);
+    assertRemoved(gce, candOne);
+    assertNoCandidatesRemoved(gce);
+
+    assertEquals(0, gce.candidates.size());
+
+    // Now enable InUse deletions
+    gce.deleteInUseRefs = true;
+
+    // Add deletion candidate for a directory.
+    var candidate = new GcCandidate("6/t-0/", 10L);
+    gce.candidates.add(candidate);
+
+    // Then create a InUse candidate for a file in that directory.
+    gce.addFileReference("6", null, "/t-0/F003.rf");
+    var removedCandidate = gce.addCandidate("6/t-0/F003.rf");
+
+    gca.collect(gce);
+    assertCandidateRemoved(gce, GcCandidateType.INUSE, removedCandidate);
+    assertRemoved(gce);
+    // Check and make sure the InUse directory candidates are not removed.
+    assertEquals(1, gce.candidates.size());
+    assertTrue(gce.candidates.contains(candidate));
+  }
+
   // below are tests for potential failure conditions of the GC process. Some 
of these cases were
   // observed on clusters. Some were hypothesis based on observations. The 
result was that
   // candidate entries were not removed when they should have been and 
therefore files were
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
index 3ba63c3da5..fdb66ba146 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -168,38 +169,49 @@ public class SimpleGarbageCollectorTest {
 
     replay(vol1, vol2, volMgr2);
 
-    TreeMap<String,String> confirmed = new TreeMap<>();
-    confirmed.put("5a/t-0001", "hdfs://nn1/accumulo/tables/5a/t-0001");
-    confirmed.put("5a/t-0001/F0001.rf", 
"hdfs://nn1/accumulo/tables/5a/t-0001/F0001.rf");
-    confirmed.put("5a/t-0001/F0002.rf", 
"hdfs://nn1/accumulo/tables/5a/t-0001/F0002.rf");
-    confirmed.put("5a/t-0002/F0001.rf", 
"hdfs://nn1/accumulo/tables/5a/t-0002/F0001.rf");
+    TreeMap<String,GcCandidate> confirmed = new TreeMap<>();
+    confirmed.put("5a/t-0001", new 
GcCandidate("hdfs://nn1/accumulo/tables/5a/t-0001", 0L));
+    confirmed.put("5a/t-0001/F0001.rf",
+        new GcCandidate("hdfs://nn1/accumulo/tables/5a/t-0001/F0001.rf", 1L));
+    confirmed.put("5a/t-0001/F0002.rf",
+        new GcCandidate("hdfs://nn1/accumulo/tables/5a/t-0001/F0002.rf", 2L));
+    confirmed.put("5a/t-0002/F0001.rf",
+        new GcCandidate("hdfs://nn1/accumulo/tables/5a/t-0002/F0001.rf", 3L));
     var allVolumesDirectory = new AllVolumesDirectory(TableId.of("5b"), 
"t-0003");
-    confirmed.put("5b/t-0003", allVolumesDirectory.getMetadataEntry());
-    confirmed.put("5b/t-0003/F0001.rf", 
"hdfs://nn1/accumulo/tables/5b/t-0003/F0001.rf");
-    confirmed.put("5b/t-0003/F0002.rf", 
"hdfs://nn2/accumulo/tables/5b/t-0003/F0002.rf");
-    confirmed.put("5b/t-0003/F0003.rf", 
"hdfs://nn3/accumulo/tables/5b/t-0003/F0003.rf");
+    confirmed.put("5b/t-0003", new 
GcCandidate(allVolumesDirectory.getMetadataEntry(), 4L));
+    confirmed.put("5b/t-0003/F0001.rf",
+        new GcCandidate("hdfs://nn1/accumulo/tables/5b/t-0003/F0001.rf", 5L));
+    confirmed.put("5b/t-0003/F0002.rf",
+        new GcCandidate("hdfs://nn2/accumulo/tables/5b/t-0003/F0002.rf", 6L));
+    confirmed.put("5b/t-0003/F0003.rf",
+        new GcCandidate("hdfs://nn3/accumulo/tables/5b/t-0003/F0003.rf", 7L));
     allVolumesDirectory = new AllVolumesDirectory(TableId.of("5b"), "t-0004");
-    confirmed.put("5b/t-0004", allVolumesDirectory.getMetadataEntry());
-    confirmed.put("5b/t-0004/F0001.rf", 
"hdfs://nn1/accumulo/tables/5b/t-0004/F0001.rf");
+    confirmed.put("5b/t-0004", new 
GcCandidate(allVolumesDirectory.getMetadataEntry(), 8L));
+    confirmed.put("5b/t-0004/F0001.rf",
+        new GcCandidate("hdfs://nn1/accumulo/tables/5b/t-0004/F0001.rf", 9L));
 
-    List<String> processedDeletes = new ArrayList<>();
+    List<GcCandidate> processedDeletes = new ArrayList<>();
 
     GCRun.minimizeDeletes(confirmed, processedDeletes, volMgr2, log);
 
-    TreeMap<String,String> expected = new TreeMap<>();
-    expected.put("5a/t-0001", "hdfs://nn1/accumulo/tables/5a/t-0001");
-    expected.put("5a/t-0002/F0001.rf", 
"hdfs://nn1/accumulo/tables/5a/t-0002/F0001.rf");
+    TreeMap<String,GcCandidate> expected = new TreeMap<>();
+    expected.put("5a/t-0001", new 
GcCandidate("hdfs://nn1/accumulo/tables/5a/t-0001", 0L));
+    expected.put("5a/t-0002/F0001.rf",
+        new GcCandidate("hdfs://nn1/accumulo/tables/5a/t-0002/F0001.rf", 3L));
     allVolumesDirectory = new AllVolumesDirectory(TableId.of("5b"), "t-0003");
-    expected.put("5b/t-0003", allVolumesDirectory.getMetadataEntry());
-    expected.put("5b/t-0003/F0003.rf", 
"hdfs://nn3/accumulo/tables/5b/t-0003/F0003.rf");
+    expected.put("5b/t-0003", new 
GcCandidate(allVolumesDirectory.getMetadataEntry(), 4L));
+    expected.put("5b/t-0003/F0003.rf",
+        new GcCandidate("hdfs://nn3/accumulo/tables/5b/t-0003/F0003.rf", 7L));
     allVolumesDirectory = new AllVolumesDirectory(TableId.of("5b"), "t-0004");
-    expected.put("5b/t-0004", allVolumesDirectory.getMetadataEntry());
+    expected.put("5b/t-0004", new 
GcCandidate(allVolumesDirectory.getMetadataEntry(), 8L));
 
     assertEquals(expected, confirmed);
-    assertEquals(Arrays.asList("hdfs://nn1/accumulo/tables/5a/t-0001/F0001.rf",
-        "hdfs://nn1/accumulo/tables/5a/t-0001/F0002.rf",
-        "hdfs://nn1/accumulo/tables/5b/t-0003/F0001.rf",
-        "hdfs://nn2/accumulo/tables/5b/t-0003/F0002.rf",
-        "hdfs://nn1/accumulo/tables/5b/t-0004/F0001.rf"), processedDeletes);
+    assertEquals(
+        Arrays.asList(new 
GcCandidate("hdfs://nn1/accumulo/tables/5a/t-0001/F0001.rf", 1L),
+            new GcCandidate("hdfs://nn1/accumulo/tables/5a/t-0001/F0002.rf", 
2L),
+            new GcCandidate("hdfs://nn1/accumulo/tables/5b/t-0003/F0001.rf", 
5L),
+            new GcCandidate("hdfs://nn2/accumulo/tables/5b/t-0003/F0002.rf", 
6L),
+            new GcCandidate("hdfs://nn1/accumulo/tables/5b/t-0004/F0001.rf", 
9L)),
+        processedDeletes);
   }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index ce9b1adda2..eb1cafec3f 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -21,16 +21,23 @@ package org.apache.accumulo.test.functional;
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.UncheckedIOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Accumulo;
@@ -45,9 +52,12 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.core.gc.GcCandidate;
 import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue;
 import org.apache.accumulo.core.security.Authorizations;
@@ -71,11 +81,14 @@ import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
 
 public class GarbageCollectorIT extends ConfigurableMacBase {
   private static final String OUR_SECRET = "itsreallysecret";
+  public static final Logger log = 
LoggerFactory.getLogger(GarbageCollectorIT.class);
 
   @Override
   protected Duration defaultTimeout() {
@@ -264,6 +277,121 @@ public class GarbageCollectorIT extends 
ConfigurableMacBase {
     }
   }
 
+  @Test
+  public void testUserUniqueMutationDelete() throws Exception {
+    killMacGc();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      String table = getUniqueNames(1)[0];
+      c.tableOperations().create(table);
+      log.info("User GcCandidate Deletion test of table: {}", table);
+      log.info("GcCandidates will be added/removed from table: {}", 
DataLevel.USER.metaTable());
+      createAndDeleteUniqueMutation(TableId.of(table), 
Ample.GcCandidateType.INUSE);
+    }
+  }
+
+  @Test
+  public void testMetadataUniqueMutationDelete() throws Exception {
+    killMacGc();
+    TableId tableId = DataLevel.USER.tableId();
+    log.info("Metadata GcCandidate Deletion test");
+    log.info("GcCandidates will be added/removed from table: {}", 
DataLevel.METADATA.metaTable());
+    createAndDeleteUniqueMutation(tableId, Ample.GcCandidateType.INUSE);
+  }
+
+  /**
+   * Root INUSE deletions are not supported in 2.1.x. This test can be 
migrated to use
+   * createAndDeleteUniqueMutation in 3.x
+   *
+   * @throws Exception may occur when killing the GC process.
+   */
+  @Test
+  public void testRootUniqueMutationDelete() throws Exception {
+    killMacGc();
+    TableId tableId = DataLevel.METADATA.tableId();
+    log.info("Root GcCandidate Deletion test");
+    // Behavior for 2.1. INUSE candidates deletion support will be added in 3.x
+    log.info("GcCandidates will be added but not removed from Zookeeper");
+
+    Ample ample = cluster.getServerContext().getAmple();
+    DataLevel datalevel = DataLevel.ROOT;
+
+    // Ensure that no other candidates exist before starting test.
+    Iterator<GcCandidate> cIter = ample.getGcCandidates(datalevel);
+
+    ArrayList<GcCandidate> tempCandidates = new ArrayList<>();
+    while (cIter.hasNext()) {
+      GcCandidate cTemp = cIter.next();
+      log.debug("PreExisting Candidate Found: {}", cTemp);
+      tempCandidates.add(cTemp);
+    }
+    assertTrue(tempCandidates.size() == 0);
+
+    // Create multiple candidate entries
+    List<GcCandidate> candidates =
+        List.of(new 
GcCandidate("hdfs://foo.com:6000/user/foo/tables/+r/t-0/F00.rf", 0L),
+            new 
GcCandidate("hdfs://foo.com:6000/user/foo/tables/+r/t-0/F001.rf", 1L));
+
+    List<StoredTabletFile> stfs = new LinkedList<>();
+    candidates.stream().forEach(temp -> stfs.add(new 
StoredTabletFile(temp.getPath())));
+
+    log.debug("Adding root table GcCandidates");
+    ample.putGcCandidates(tableId, stfs);
+
+    // Retrieve the recently created entries.
+    cIter = ample.getGcCandidates(datalevel);
+
+    int counter = 0;
+    while (cIter.hasNext()) {
+      // Duplicate these entries back into zookeeper
+      ample.putGcCandidates(tableId, List.of(new 
StoredTabletFile(cIter.next().getPath())));
+      counter++;
+    }
+    // Ensure Zookeeper collapsed the entries and did not support duplicates.
+    assertTrue(counter == 2);
+
+    cIter = ample.getGcCandidates(datalevel);
+    while (cIter.hasNext()) {
+      // This should be a noop call. Root inUse candidate deletions are not 
supported in 2.1.x
+      ample.deleteGcCandidates(datalevel, List.of(cIter.next()), 
Ample.GcCandidateType.INUSE);
+    }
+
+    // Check that GcCandidates still exist
+    cIter = ample.getGcCandidates(datalevel);
+
+    counter = candidates.size();
+    while (cIter.hasNext()) {
+      GcCandidate gcC = cIter.next();
+      log.debug("Candidate Found: {}", gcC);
+      for (GcCandidate cand : candidates) {
+        if (gcC.getPath().equals(cand.getPath())) {
+          // Candidate uid's will never match as they are randomly generated 
in 2.1.x
+          assertTrue(!Objects.equals(gcC.getUid(), cand.getUid()));
+          counter--;
+        }
+      }
+    }
+    // Ensure that we haven't seen more candidates than we expected.
+    assertTrue(counter == 0);
+
+    // Delete the candidates as VALID GcCandidates
+    cIter = ample.getGcCandidates(datalevel);
+    while (cIter.hasNext()) {
+      ample.deleteGcCandidates(datalevel, List.of(cIter.next()), 
Ample.GcCandidateType.VALID);
+    }
+    // Ensure the GcCandidates have been removed.
+    cIter = ample.getGcCandidates(datalevel);
+
+    counter = 0;
+    while (cIter.hasNext()) {
+      GcCandidate gcC = cIter.next();
+      if (gcC != null) {
+        log.error("Candidate Found: {}", gcC);
+        counter++;
+      }
+    }
+    assertEquals(counter, 0);
+  }
+
   @Test
   public void testProperPortAdvertisement() throws Exception {
 
@@ -329,4 +457,61 @@ public class GarbageCollectorIT extends 
ConfigurableMacBase {
       }
     }
   }
+
+  private void createAndDeleteUniqueMutation(TableId tableId, 
Ample.GcCandidateType type) {
+    Ample ample = cluster.getServerContext().getAmple();
+    DataLevel datalevel = Ample.DataLevel.of(tableId);
+
+    // Ensure that no other candidates exist before starting test.
+    List<GcCandidate> candidates = new ArrayList<>();
+    Iterator<GcCandidate> candidate = ample.getGcCandidates(datalevel);
+
+    while (candidate.hasNext()) {
+      GcCandidate cTemp = candidate.next();
+      log.debug("PreExisting Candidate Found: {}", cTemp);
+      candidates.add(cTemp);
+    }
+    assertTrue(candidates.size() == 0);
+
+    // Create multiple candidate entries
+    List<StoredTabletFile> stfs = Stream
+        .of(new 
StoredTabletFile("hdfs://foo.com:6000/user/foo/tables/a/t-0/F00.rf"),
+            new 
StoredTabletFile("hdfs://foo.com:6000/user/foo/tables/b/t-0/F00.rf"))
+        .collect(Collectors.toList());
+
+    log.debug("Adding candidates to table {}", tableId);
+    ample.putGcCandidates(tableId, stfs);
+    // Retrieve new entries.
+    candidate = ample.getGcCandidates(datalevel);
+
+    while (candidate.hasNext()) {
+      GcCandidate cTemp = candidate.next();
+      log.debug("Candidate Found: {}", cTemp);
+      candidates.add(cTemp);
+    }
+    assertTrue(candidates.size() == 2);
+
+    GcCandidate deleteCandidate = candidates.get(0);
+    assertNotNull(deleteCandidate);
+    ample.putGcCandidates(tableId, List.of(new 
StoredTabletFile(deleteCandidate.getPath())));
+
+    log.debug("Deleting Candidate {}", deleteCandidate);
+    ample.deleteGcCandidates(datalevel, List.of(deleteCandidate), 
Ample.GcCandidateType.INUSE);
+
+    candidate = ample.getGcCandidates(datalevel);
+
+    int counter = 0;
+    boolean foundNewCandidate = false;
+    while (candidate.hasNext()) {
+      GcCandidate gcC = candidate.next();
+      log.debug("Candidate Found: {}", gcC);
+      if (gcC.getPath().equals(deleteCandidate.getPath())) {
+        assertTrue(!Objects.equals(gcC.getUid(), deleteCandidate.getUid()));
+        foundNewCandidate = true;
+      }
+      counter++;
+    }
+    assertTrue(counter == 2);
+    assertTrue(foundNewCandidate);
+  }
 }


Reply via email to