This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 9d4dc21abc Allow system compactions to run if zero user compaction 
jobs have run (#4480)
9d4dc21abc is described below

commit 9d4dc21abce1e55daf21f06fdf87f1cffac743ef
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Sat May 18 12:00:12 2024 -0400

    Allow system compactions to run if zero user compaction jobs have run 
(#4480)
    
    This change will allow system compactions to postpone user compactions
    that have had no jobs run yet. Before this, if a user compaction was in
    the queue and had selected files that overlapped it would block system
    compactions from running. Now if there are selected files, but the user
    compaction is not running and hasn't had any jobs completed, the
    coordinator will clear the selectedFiles column so that the system
    compaction can run if the expiration time has passed. The fate operation
    will reset the column again while trying to make progress.
    
    This closes #4454
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../core/metadata/schema/SelectedFiles.java        |  57 +++++--
 .../core/metadata/schema/SelectedFilesTest.java    |  47 +++---
 .../core/metadata/schema/TabletMetadataTest.java   |   3 +-
 .../server/compaction/CompactionJobGenerator.java  |  25 ++-
 .../manager/state/TabletManagementIterator.java    |   4 +-
 .../manager/state/TabletManagementParameters.java  |  14 +-
 .../constraints/MetadataConstraintsTest.java       |   3 +-
 .../state/TabletManagementParametersTest.java      |   6 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   2 +-
 .../accumulo/manager/TabletGroupWatcher.java       |   8 +-
 .../coordinator/CompactionCoordinator.java         |  50 +++++-
 .../coordinator/commit/CommitCompaction.java       |   7 +-
 .../manager/tableOps/compact/CompactionDriver.java |  15 +-
 .../compaction/CompactionCoordinatorTest.java      | 151 +++++++++++++------
 .../manager/tableOps/merge/MergeTabletsTest.java   |   5 +-
 .../test/functional/AmpleConditionalWriterIT.java  |  32 ++--
 .../accumulo/test/functional/CompactionIT.java     | 167 +++++++++++++++++++++
 .../functional/TabletManagementIteratorIT.java     |   5 +-
 18 files changed, 483 insertions(+), 118 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java
index f2dd1ad861..7cb4c3277e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java
@@ -24,10 +24,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.util.time.SteadyTime;
 
 import com.google.common.base.Preconditions;
 import com.google.gson.Gson;
@@ -44,27 +46,48 @@ public class SelectedFiles {
   private final Set<StoredTabletFile> files;
   private final boolean initiallySelectedAll;
   private final FateId fateId;
+  private final int completedJobs;
+  private final SteadyTime selectedTime;
 
   private String metadataValue;
 
   private static final Gson GSON = new GsonBuilder()
       .registerTypeAdapter(SelectedFiles.class, new 
SelectedFilesTypeAdapter()).create();
 
-  public SelectedFiles(Set<StoredTabletFile> files, boolean 
initiallySelectedAll, FateId fateId) {
+  public SelectedFiles(Set<StoredTabletFile> files, boolean 
initiallySelectedAll, FateId fateId,
+      SteadyTime selectedTime) {
+    this(files, initiallySelectedAll, fateId, 0, selectedTime);
+  }
+
+  public SelectedFiles(Set<StoredTabletFile> files, boolean 
initiallySelectedAll, FateId fateId,
+      int completedJobs, SteadyTime selectedTime) {
     Preconditions.checkArgument(files != null && !files.isEmpty());
+    Preconditions.checkArgument(completedJobs >= 0);
     this.files = Set.copyOf(files);
     this.initiallySelectedAll = initiallySelectedAll;
-    this.fateId = fateId;
+    this.fateId = Objects.requireNonNull(fateId);
+    this.completedJobs = completedJobs;
+    this.selectedTime = Objects.requireNonNull(selectedTime);
   }
 
   private static class SelectedFilesTypeAdapter extends 
TypeAdapter<SelectedFiles> {
 
+    // These fields could be moved to an enum but for now just using static 
Strings
+    // seems better to avoid having to construct an enum each time the string 
is read
+    private static final String FATE_ID = "fateId";
+    private static final String SELECTED_ALL = "selAll";
+    private static final String COMPLETED_JOBS = "compJobs";
+    private static final String FILES = "files";
+    private static final String SELECTED_TIME_NANOS = "selTimeNanos";
+
     @Override
     public void write(JsonWriter out, SelectedFiles selectedFiles) throws 
IOException {
       out.beginObject();
-      out.name("fateId").value(selectedFiles.getFateId().canonical());
-      out.name("selAll").value(selectedFiles.initiallySelectedAll());
-      out.name("files").beginArray();
+      out.name(FATE_ID).value(selectedFiles.getFateId().canonical());
+      out.name(SELECTED_ALL).value(selectedFiles.initiallySelectedAll());
+      out.name(COMPLETED_JOBS).value(selectedFiles.getCompletedJobs());
+      
out.name(SELECTED_TIME_NANOS).value(selectedFiles.getSelectedTime().getNanos());
+      out.name(FILES).beginArray();
       // sort the data to make serialized json comparable
       
selectedFiles.getFiles().stream().map(StoredTabletFile::getMetadata).sorted()
           .forEach(file -> {
@@ -83,19 +106,27 @@ public class SelectedFiles {
     public SelectedFiles read(JsonReader in) throws IOException {
       FateId fateId = null;
       boolean selAll = false;
+      int completedJobs = 0;
       List<String> files = new ArrayList<>();
+      SteadyTime selectedTime = null;
 
       in.beginObject();
       while (in.hasNext()) {
         String name = in.nextName();
         switch (name) {
-          case "fateId":
+          case FATE_ID:
             fateId = FateId.from(in.nextString());
             break;
-          case "selAll":
+          case SELECTED_ALL:
             selAll = in.nextBoolean();
             break;
-          case "files":
+          case COMPLETED_JOBS:
+            completedJobs = in.nextInt();
+            break;
+          case SELECTED_TIME_NANOS:
+            selectedTime = SteadyTime.from(in.nextLong(), 
TimeUnit.NANOSECONDS);
+            break;
+          case FILES:
             in.beginArray();
             while (in.hasNext()) {
               files.add(in.nextString());
@@ -111,7 +142,7 @@ public class SelectedFiles {
       Set<StoredTabletFile> tabletFiles =
           
files.stream().map(StoredTabletFile::new).collect(Collectors.toSet());
 
-      return new SelectedFiles(tabletFiles, selAll, fateId);
+      return new SelectedFiles(tabletFiles, selAll, fateId, completedJobs, 
selectedTime);
     }
 
   }
@@ -132,6 +163,14 @@ public class SelectedFiles {
     return fateId;
   }
 
+  public int getCompletedJobs() {
+    return completedJobs;
+  }
+
+  public SteadyTime getSelectedTime() {
+    return selectedTime;
+  }
+
   public String getMetadataValue() {
     if (this.metadataValue == null) {
       // use the custom TypeAdapter to create the json
diff --git 
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java
 
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java
index a8bd1e67da..974465c529 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -37,6 +38,7 @@ import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -56,7 +58,8 @@ public class SelectedFilesTest {
     Set<StoredTabletFile> files = getStoredTabletFiles(2);
     FateId fateId = FateId.from(FateInstanceType.META, UUID.randomUUID());
 
-    SelectedFiles original = new SelectedFiles(files, true, fateId);
+    SelectedFiles original =
+        new SelectedFiles(files, true, fateId, SteadyTime.from(100_100, 
TimeUnit.NANOSECONDS));
 
     String json = original.getMetadataValue();
     SelectedFiles deserialized = SelectedFiles.from(json);
@@ -73,8 +76,10 @@ public class SelectedFilesTest {
     Set<StoredTabletFile> files = getStoredTabletFiles(16);
     FateId fateId = FateId.from(FateInstanceType.META, UUID.randomUUID());
 
-    SelectedFiles sf1 = new SelectedFiles(files, true, fateId);
-    SelectedFiles sf2 = new SelectedFiles(files, true, fateId);
+    SelectedFiles sf1 =
+        new SelectedFiles(files, true, fateId, SteadyTime.from(100_100, 
TimeUnit.NANOSECONDS));
+    SelectedFiles sf2 =
+        new SelectedFiles(files, true, fateId, SteadyTime.from(100_100, 
TimeUnit.NANOSECONDS));
 
     assertEquals(sf1.getMetadataValue(), sf2.getMetadataValue());
     assertEquals(sf1, sf2);
@@ -94,8 +99,10 @@ public class SelectedFilesTest {
     assertNotEquals(files.toString(), sortedFiles.toString(),
         "Order of files set should differ for this test case");
 
-    SelectedFiles sf1 = new SelectedFiles(files, false, fateId);
-    SelectedFiles sf2 = new SelectedFiles(sortedFiles, false, fateId);
+    SelectedFiles sf1 =
+        new SelectedFiles(files, false, fateId, SteadyTime.from(100_100, 
TimeUnit.NANOSECONDS));
+    SelectedFiles sf2 = new SelectedFiles(sortedFiles, false, fateId,
+        SteadyTime.from(100_100, TimeUnit.NANOSECONDS));
 
     assertEquals(sf1.getMetadataValue(), sf2.getMetadataValue());
     assertEquals(sf1, sf2);
@@ -113,8 +120,10 @@ public class SelectedFilesTest {
     // Remove an element to create a subset
     filesSubSet.remove(filesSubSet.iterator().next());
 
-    SelectedFiles superSetSelectedFiles = new SelectedFiles(filesSuperSet, 
true, fateId);
-    SelectedFiles subSetSelectedFiles = new SelectedFiles(filesSubSet, true, 
fateId);
+    SelectedFiles superSetSelectedFiles = new SelectedFiles(filesSuperSet, 
true, fateId,
+        SteadyTime.from(100_100, TimeUnit.NANOSECONDS));
+    SelectedFiles subSetSelectedFiles = new SelectedFiles(filesSubSet, true, 
fateId,
+        SteadyTime.from(100_100, TimeUnit.NANOSECONDS));
 
     String superSetJson = superSetSelectedFiles.getMetadataValue();
     String subSetJson = subSetSelectedFiles.getMetadataValue();
@@ -134,11 +143,12 @@ public class SelectedFilesTest {
   }
 
   private static Stream<Arguments> provideTestJsons() {
-    return 
Stream.of(Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", true, 
12),
-        Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", false, 
12),
-        Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", false, 
23),
-        Arguments.of("FATE:META:abcdef12-3456-789a-bcde-f123456789ab", false, 
23),
-        Arguments.of("FATE:META:41b40c7c-55e5-4d3b-8d21-1b70d1e7f3fb", false, 
23));
+    return Stream.of(
+        Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", true, 
0, 1000, 12),
+        Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", false, 
1, 2000, 12),
+        Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", false, 
2, 3000, 23),
+        Arguments.of("FATE:META:abcdef12-3456-789a-bcde-f123456789ab", false, 
2, 4000, 23),
+        Arguments.of("FATE:META:41b40c7c-55e5-4d3b-8d21-1b70d1e7f3fb", false, 
2, 5000, 23));
   }
 
   /**
@@ -147,14 +157,15 @@ public class SelectedFilesTest {
    */
   @ParameterizedTest
   @MethodSource("provideTestJsons")
-  public void testJsonStrings(FateId fateId, boolean selAll, int numPaths) {
+  public void testJsonStrings(FateId fateId, boolean selAll, int compJobs, 
long selTimeNanos,
+      int numPaths) {
     List<String> paths = getFilePaths(numPaths);
 
     // should be resilient to unordered file arrays
     Collections.shuffle(paths, RANDOM.get());
 
     // construct a json from the given parameters
-    String json = getJson(fateId, selAll, paths);
+    String json = getJson(fateId, selAll, compJobs, selTimeNanos, paths);
 
     System.out.println(json);
 
@@ -168,7 +179,7 @@ public class SelectedFilesTest {
     assertEquals(expectedStoredTabletFiles, selectedFiles.getFiles());
 
     Collections.sort(paths);
-    String jsonWithSortedFiles = getJson(fateId, selAll, paths);
+    String jsonWithSortedFiles = getJson(fateId, selAll, compJobs, 
selTimeNanos, paths);
     assertEquals(jsonWithSortedFiles, selectedFiles.getMetadataValue());
   }
 
@@ -184,12 +195,14 @@ public class SelectedFilesTest {
    * }
    * </pre>
    */
-  private static String getJson(FateId fateId, boolean selAll, List<String> 
paths) {
+  private static String getJson(FateId fateId, boolean selAll, int compJobs, 
long selTimeNanos,
+      List<String> paths) {
     String filesJsonArray =
         paths.stream().map(path -> new ReferencedTabletFile(new 
Path(path)).insert().getMetadata())
             .map(path -> path.replace("\"", "\\\"")).map(path -> "'" + path + 
"'")
             .collect(Collectors.joining(","));
-    return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'files':[" + 
filesJsonArray + "]}")
+    return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'compJobs':" + 
compJobs
+        + ",'selTimeNanos':" + selTimeNanos + ",'files':[" + filesJsonArray + 
"]}")
         .replace('\'', '\"');
   }
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
 
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
index 109be1aadc..0f51d61283 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java
@@ -667,7 +667,8 @@ public class TabletMetadataTest {
     LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID());
 
     FateId selFilesFateId = FateId.from(type, UUID.randomUUID());
-    SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, 
selFilesFateId);
+    SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, 
selFilesFateId,
+        SteadyTime.from(100_000, TimeUnit.NANOSECONDS));
     var unsplittableMeta =
         UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, 
sf2));
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
index 1d88de2eaa..3b20c1ef99 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.PluginEnvironment;
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.FateId;
@@ -51,6 +52,7 @@ import 
org.apache.accumulo.core.util.compaction.CompactionJobImpl;
 import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
 import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,9 +67,10 @@ public class CompactionJobGenerator {
   private final PluginEnvironment env;
   private final Map<FateId,Map<String,String>> allExecutionHints;
   private final Cache<Pair<TableId,CompactionServiceId>,Long> 
unknownCompactionServiceErrorCache;
+  private final SteadyTime steadyTime;
 
   public CompactionJobGenerator(PluginEnvironment env,
-      Map<FateId,Map<String,String>> executionHints) {
+      Map<FateId,Map<String,String>> executionHints, SteadyTime steadyTime) {
     servicesConfig = new CompactionServicesConfig(env.getConfiguration());
     serviceIds = 
servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of)
         .collect(Collectors.toUnmodifiableSet());
@@ -87,6 +90,7 @@ public class CompactionJobGenerator {
     unknownCompactionServiceErrorCache =
         
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN, 
false)
             .expireAfterWrite(5, TimeUnit.MINUTES).build();
+    this.steadyTime = steadyTime;
   }
 
   public Collection<CompactionJob> generateJobs(TabletMetadata tablet, 
Set<CompactionKind> kinds) {
@@ -205,9 +209,22 @@ public class CompactionJobGenerator {
         // remove any files that are in active compactions
         tablet.getExternalCompactions().values().stream().flatMap(ecm -> 
ecm.getJobFiles().stream())
             .forEach(tmpFiles::remove);
-        // remove any files that are selected
-        if (tablet.getSelectedFiles() != null) {
-          tmpFiles.keySet().removeAll(tablet.getSelectedFiles().getFiles());
+        // remove any files that are selected and the user compaction has 
completed
+        // at least 1 job, otherwise we can keep the files
+        var selectedFiles = tablet.getSelectedFiles();
+
+        if (selectedFiles != null) {
+          long selectedExpirationDuration =
+              
ConfigurationTypeHelper.getTimeInMillis(env.getConfiguration(tablet.getTableId())
+                  
.get(Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey()));
+
+          // If jobs are completed, or selected time has not expired, the 
remove
+          // from the candidate list otherwise we can cancel the selection
+          if (selectedFiles.getCompletedJobs() > 0
+              || (steadyTime.minus(selectedFiles.getSelectedTime()).toMillis()
+                  < selectedExpirationDuration)) {
+            tmpFiles.keySet().removeAll(selectedFiles.getFiles());
+          }
         }
         candidates = tmpFiles.entrySet().stream()
             .map(entry -> new CompactableFileImpl(entry.getKey(), 
entry.getValue()))
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index 4b2ba5d4f7..b9c64c7f84 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@ -160,8 +160,8 @@ public class TabletManagementIterator extends 
SkippingIterator {
     this.env = env;
     tabletMgmtParams =
         
TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION));
-    compactionGenerator =
-        new CompactionJobGenerator(env.getPluginEnv(), 
tabletMgmtParams.getCompactionHints());
+    compactionGenerator = new CompactionJobGenerator(env.getPluginEnv(),
+        tabletMgmtParams.getCompactionHints(), 
tabletMgmtParams.getSteadyTime());
     final AccumuloConfiguration conf = new 
ConfigurationCopy(env.getPluginEnv().getConfiguration());
     BalancerEnvironmentImpl benv =
         new BalancerEnvironmentImpl(((TabletIteratorEnvironment) 
env).getServerContext());
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
index 197b9feca6..a407a20b6e 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
@@ -32,7 +32,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -43,6 +45,7 @@ import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -75,13 +78,14 @@ public class TabletManagementParameters {
   private final Set<TServerInstance> onlineTservers;
   private final boolean canSuspendTablets;
   private final Map<Path,Path> volumeReplacements;
+  private final SteadyTime steadyTime;
 
   public TabletManagementParameters(ManagerState managerState,
       Map<Ample.DataLevel,Boolean> parentUpgradeMap, Set<TableId> onlineTables,
       LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot,
       Set<TServerInstance> serversToShutdown, Map<KeyExtent,TServerInstance> 
migrations,
       Ample.DataLevel level, Map<FateId,Map<String,String>> compactionHints,
-      boolean canSuspendTablets, Map<Path,Path> volumeReplacements) {
+      boolean canSuspendTablets, Map<Path,Path> volumeReplacements, SteadyTime 
steadyTime) {
     this.managerState = managerState;
     this.parentUpgradeMap = Map.copyOf(parentUpgradeMap);
     // TODO could filter by level
@@ -103,6 +107,7 @@ public class TabletManagementParameters {
     });
     this.canSuspendTablets = canSuspendTablets;
     this.volumeReplacements = Map.copyOf(volumeReplacements);
+    this.steadyTime = Objects.requireNonNull(steadyTime);
   }
 
   private TabletManagementParameters(JsonData jdata) {
@@ -130,6 +135,7 @@ public class TabletManagementParameters {
     });
     this.canSuspendTablets = jdata.canSuspendTablets;
     this.volumeReplacements = 
Collections.unmodifiableMap(jdata.volumeReplacements);
+    this.steadyTime = SteadyTime.from(jdata.steadyTime, TimeUnit.NANOSECONDS);
   }
 
   public ManagerState getManagerState() {
@@ -188,6 +194,10 @@ public class TabletManagementParameters {
     return volumeReplacements;
   }
 
+  public SteadyTime getSteadyTime() {
+    return steadyTime;
+  }
+
   private static Map<FateId,Map<String,String>>
       makeImmutable(Map<FateId,Map<String,String>> compactionHints) {
     var copy = new HashMap<FateId,Map<String,String>>();
@@ -212,6 +222,7 @@ public class TabletManagementParameters {
 
     final boolean canSuspendTablets;
     final Map<Path,Path> volumeReplacements;
+    final long steadyTime;
 
     private static String toString(KeyExtent extent) {
       DataOutputBuffer buffer = new DataOutputBuffer();
@@ -255,6 +266,7 @@ public class TabletManagementParameters {
           .collect(Collectors.toMap(entry -> entry.getKey().canonical(), 
Map.Entry::getValue));
       canSuspendTablets = params.canSuspendTablets;
       volumeReplacements = params.volumeReplacements;
+      steadyTime = params.steadyTime.getNanos();
     }
 
   }
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
index 71e1ada06e..b2aebb5b36 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java
@@ -526,7 +526,8 @@ public class MetadataConstraintsTest {
     ServerColumnFamily.SELECTED_COLUMN.put(m,
         new Value(new SelectedFiles(Set.of(new ReferencedTabletFile(
             new 
Path("hdfs://nn.somewhere.com:86753/accumulo/tables/42/t-0000/F00001.rf"))
-            .insert()), true, fateId).getMetadataValue()));
+            .insert()), true, fateId, SteadyTime.from(100, 
TimeUnit.NANOSECONDS))
+            .getMetadataValue()));
     violations = mc.check(createEnv(), m);
     assertNull(violations);
   }
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java
index ff6a2fc940..fc9fe4e9d5 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.data.TableId;
@@ -30,6 +31,7 @@ import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
@@ -55,10 +57,11 @@ public class TabletManagementParametersTest {
     final boolean canSuspendTablets = true;
     final Map<Path,Path> replacements =
         Map.of(new Path("file:/vol1/accumulo/inst_id"), new 
Path("file:/vol2/accumulo/inst_id"));
+    final SteadyTime steadyTime = SteadyTime.from(100_000, 
TimeUnit.NANOSECONDS);
 
     final TabletManagementParameters tmp = new 
TabletManagementParameters(managerState,
         parentUpgradeMap, onlineTables, serverSnapshot, serversToShutdown, 
migrations, dataLevel,
-        compactionHints, canSuspendTablets, replacements);
+        compactionHints, canSuspendTablets, replacements, steadyTime);
 
     String jsonString = tmp.serialize();
     TabletManagementParameters tmp2 = 
TabletManagementParameters.deserialize(jsonString);
@@ -73,5 +76,6 @@ public class TabletManagementParametersTest {
     assertEquals(compactionHints, tmp2.getCompactionHints());
     assertEquals(canSuspendTablets, tmp2.canSuspendTablets());
     assertEquals(replacements, tmp2.getVolumeReplacements());
+    assertEquals(steadyTime, tmp2.getSteadyTime());
   }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index ac9b682cb9..244a2696ab 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -995,7 +995,7 @@ public class Manager extends AbstractServer
     fateServiceHandler = new FateServiceHandler(this);
     managerClientHandler = new ManagerClientServiceHandler(this);
     compactionCoordinator =
-        new CompactionCoordinator(context, security, fateRefs, 
getResourceGroup());
+        new CompactionCoordinator(context, security, fateRefs, 
getResourceGroup(), this);
 
     // Start the Manager's Client service
     // Ensure that calls before the manager gets the lock fail
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 7ab6415646..ee9765138c 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -347,7 +347,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         manager.onlineTables(), tServersSnapshot, shutdownServers, 
manager.migrationsSnapshot(),
         store.getLevel(), manager.getCompactionHints(store.getLevel()), 
canSuspendTablets(),
         lookForTabletsNeedingVolReplacement ? 
manager.getContext().getVolumeReplacements()
-            : Map.of());
+            : Map.of(),
+        manager.getSteadyTime());
   }
 
   private Set<TServerInstance> getFilteredServersToShutdown() {
@@ -382,8 +383,9 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     TabletLists tLists = new TabletLists(currentTServers, 
tableMgmtParams.getGroupedTServers(),
         tableMgmtParams.getServersToShutdown());
 
-    CompactionJobGenerator compactionGenerator = new CompactionJobGenerator(
-        new ServiceEnvironmentImpl(manager.getContext()), 
tableMgmtParams.getCompactionHints());
+    CompactionJobGenerator compactionGenerator =
+        new CompactionJobGenerator(new 
ServiceEnvironmentImpl(manager.getContext()),
+            tableMgmtParams.getCompactionHints(), 
tableMgmtParams.getSteadyTime());
 
     Set<TServerInstance> filteredServersToShutdown =
         new HashSet<>(tableMgmtParams.getServersToShutdown());
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 8fcddc530f..bce38de18d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -109,6 +110,7 @@ import 
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.compaction.RunningCompaction;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.manager.Manager;
 import 
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
@@ -175,14 +177,16 @@ public class CompactionCoordinator
   private final DeadCompactionDetector deadCompactionDetector;
 
   private QueueMetrics queueMetrics;
+  private final Manager manager;
 
   public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
       AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances,
-      final String resourceGroupName) {
+      final String resourceGroupName, Manager manager) {
     this.ctx = ctx;
     this.schedExecutor = this.ctx.getScheduledExecutor();
     this.security = security;
     this.resourceGroupName = resourceGroupName;
+    this.manager = Objects.requireNonNull(manager);
 
     this.jobQueues = new CompactionJobQueues(
         
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
@@ -423,7 +427,7 @@ public class CompactionCoordinator
 
   @VisibleForTesting
   public static boolean canReserveCompaction(TabletMetadata tablet, 
CompactionKind kind,
-      Set<StoredTabletFile> jobFiles, ServerContext ctx) {
+      Set<StoredTabletFile> jobFiles, ServerContext ctx, SteadyTime 
steadyTime) {
 
     if (tablet == null) {
       // the tablet no longer exist
@@ -457,8 +461,8 @@ public class CompactionCoordinator
               "Unable to reserve {} for system compaction, tablet has {} 
pending requested user compactions",
               tablet.getExtent(), userRequestedCompactions);
           return false;
-        } else if (tablet.getSelectedFiles() != null
-            && !Collections.disjoint(jobFiles, 
tablet.getSelectedFiles().getFiles())) {
+        } else if (!Collections.disjoint(jobFiles,
+            getFilesReservedBySelection(tablet, steadyTime, ctx))) {
           return false;
         }
         break;
@@ -549,7 +553,8 @@ public class CompactionCoordinator
       try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
         var extent = metaJob.getTabletMetadata().getExtent();
 
-        if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), 
jobFiles, ctx)) {
+        if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), 
jobFiles, ctx,
+            manager.getSteadyTime())) {
           return null;
         }
 
@@ -561,6 +566,21 @@ public class CompactionCoordinator
         var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
             .requireSame(tabletMetadata, FILES, SELECTED, ECOMP);
 
+        if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
+          var selectedFiles = tabletMetadata.getSelectedFiles();
+          var reserved = getFilesReservedBySelection(tabletMetadata, 
manager.getSteadyTime(), ctx);
+
+          // If there is a selectedFiles column, and the reserved set is empty 
this means that
+          // either no user jobs were completed yet or the selection 
expiration time has passed
+          // so the column is eligible to be deleted so a system job can run 
instead
+          if (selectedFiles != null && reserved.isEmpty()
+              && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
+            LOG.debug("Deleting user compaction selected files for {} {}", 
extent,
+                externalCompactionId);
+            tabletMutator.deleteSelectedFiles();
+          }
+        }
+
         tabletMutator.putExternalCompaction(externalCompactionId, ecm);
         tabletMutator.submit(tm -> 
tm.getExternalCompactions().containsKey(externalCompactionId));
 
@@ -1052,4 +1072,24 @@ public class CompactionCoordinator
     }
   }
 
+  private static Set<StoredTabletFile> 
getFilesReservedBySelection(TabletMetadata tabletMetadata,
+      SteadyTime steadyTime, ServerContext ctx) {
+    if (tabletMetadata.getSelectedFiles() == null) {
+      return Set.of();
+    }
+
+    if (tabletMetadata.getSelectedFiles().getCompletedJobs() > 0) {
+      return tabletMetadata.getSelectedFiles().getFiles();
+    }
+
+    long selectedExpirationDuration = 
ctx.getTableConfiguration(tabletMetadata.getTableId())
+        .getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION);
+
+    if 
(steadyTime.minus(tabletMetadata.getSelectedFiles().getSelectedTime()).toMillis()
+        < selectedExpirationDuration) {
+      return tabletMetadata.getSelectedFiles().getFiles();
+    }
+
+    return Set.of();
+  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index 6b3db6b136..939d897980 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -197,9 +197,10 @@ public class CommitCompaction extends ManagerRepo {
               tablet.getExtent());
         }
 
-        tabletMutator.putSelectedFiles(
-            new SelectedFiles(newSelectedFileSet, 
tablet.getSelectedFiles().initiallySelectedAll(),
-                tablet.getSelectedFiles().getFateId()));
+        tabletMutator.putSelectedFiles(new SelectedFiles(newSelectedFileSet,
+            tablet.getSelectedFiles().initiallySelectedAll(), 
tablet.getSelectedFiles().getFateId(),
+            tablet.getSelectedFiles().getCompletedJobs() + 1,
+            tablet.getSelectedFiles().getSelectedTime()));
       }
     }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index d9424b68fe..d1ace6c816 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -241,17 +241,22 @@ class CompactionDriver extends ManagerRepo {
             noneSelected++;
           } else {
             var mutator = 
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
-                .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED);
-            var selectedFiles =
-                new SelectedFiles(filesToCompact, 
tablet.getFiles().equals(filesToCompact), fateId);
+                .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED, 
USER_COMPACTION_REQUESTED);
+            var selectedFiles = new SelectedFiles(filesToCompact,
+                tablet.getFiles().equals(filesToCompact), fateId, 
manager.getSteadyTime());
 
             mutator.putSelectedFiles(selectedFiles);
 
+            // We no longer need to include this marker if files are selected
+            if (tablet.getUserCompactionsRequested().contains(fateId)) {
+              mutator.deleteUserCompactionRequested(fateId);
+            }
+
             selectionsSubmitted.put(tablet.getExtent(), filesToCompact);
 
             mutator.submit(tabletMetadata -> tabletMetadata.getSelectedFiles() 
!= null
-                && tabletMetadata.getSelectedFiles().getMetadataValue()
-                    .equals(selectedFiles.getMetadataValue()));
+                && tabletMetadata.getSelectedFiles().getFateId().equals(fateId)
+                || tabletMetadata.getCompacted().contains(fateId));
 
             if (minSelected == null || 
tablet.getExtent().compareTo(minSelected) < 0) {
               minSelected = tablet.getExtent();
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 1c4a02a7a4..962ac18a13 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -43,6 +43,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -82,6 +83,7 @@ import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.cache.Caches;
 import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
 import org.apache.accumulo.core.util.compaction.RunningCompaction;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.manager.Manager;
 import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
@@ -126,8 +128,8 @@ public class CompactionCoordinatorTest {
     private Set<ExternalCompactionId> metadataCompactionIds = null;
 
     public TestCoordinator(ServerContext ctx, SecurityOperation security,
-        List<RunningCompaction> runningCompactions) {
-      super(ctx, security, fateInstances, "TEST_GROUP");
+        List<RunningCompaction> runningCompactions, Manager manager) {
+      super(ctx, security, fateInstances, "TEST_GROUP", manager);
       this.runningCompactions = runningCompactions;
     }
 
@@ -240,9 +242,13 @@ public class CompactionCoordinatorTest {
 
     AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
 
-    EasyMock.replay(context, security);
+    Manager manager = EasyMock.createNiceMock(Manager.class);
+    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
+        .anyTimes();
 
-    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>());
+    EasyMock.replay(context, security, manager);
+
+    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>(), manager);
     assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
     assertEquals(0, coordinator.getRunning().size());
     coordinator.run();
@@ -273,9 +279,13 @@ public class CompactionCoordinatorTest {
 
     AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
 
-    EasyMock.replay(context, job, security);
+    Manager manager = EasyMock.createNiceMock(Manager.class);
+    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
+        .anyTimes();
+
+    EasyMock.replay(context, job, security, manager);
 
-    var coordinator = new TestCoordinator(context, security, 
runningCompactions);
+    var coordinator = new TestCoordinator(context, security, 
runningCompactions, manager);
     coordinator.resetInternals();
     assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
     assertEquals(0, coordinator.getRunning().size());
@@ -320,10 +330,13 @@ public class CompactionCoordinatorTest {
     expect(tm.getExtent()).andReturn(ke).anyTimes();
     expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes();
     expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes();
+    Manager manager = EasyMock.createNiceMock(Manager.class);
+    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
+        .anyTimes();
 
-    EasyMock.replay(tconf, context, creds, tm, security);
+    EasyMock.replay(tconf, context, creds, tm, security, manager);
 
-    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>());
+    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>(), manager);
     assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
     assertEquals(0, coordinator.getRunning().size());
     // Use coordinator.run() to populate the internal data structures. This is 
tested in a different
@@ -371,9 +384,13 @@ public class CompactionCoordinatorTest {
     AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
     expect(security.canPerformSystemActions(creds)).andReturn(true);
 
-    EasyMock.replay(context, creds, security);
+    Manager manager = EasyMock.createNiceMock(Manager.class);
+    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
+        .anyTimes();
+
+    EasyMock.replay(context, creds, security, manager);
 
-    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>());
+    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>(), manager);
     TExternalCompactionJob job = 
coordinator.getCompactionJob(TraceUtil.traceInfo(), creds,
         GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString());
     assertNull(job.getExternalCompactionId());
@@ -391,10 +408,14 @@ public class CompactionCoordinatorTest {
     TCredentials creds = EasyMock.createNiceMock(TCredentials.class);
 
     AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
+    Manager manager = EasyMock.createNiceMock(Manager.class);
+    expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, 
TimeUnit.NANOSECONDS))
+        .anyTimes();
 
-    EasyMock.replay(context, creds, security);
+    EasyMock.replay(context, creds, security, manager);
 
-    TestCoordinator coordinator = new TestCoordinator(context, security, new 
ArrayList<>());
+    TestCoordinator coordinator =
+        new TestCoordinator(context, security, new ArrayList<>(), manager);
 
     var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
     var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
@@ -432,6 +453,12 @@ public class CompactionCoordinatorTest {
     
EasyMock.expect(context.getTableState(tableId1)).andReturn(TableState.ONLINE).atLeastOnce();
     
EasyMock.expect(context.getTableState(tableId2)).andReturn(TableState.OFFLINE).atLeastOnce();
 
+    TableConfiguration tableConf = 
EasyMock.createMock(TableConfiguration.class);
+    
EasyMock.expect(tableConf.getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION))
+        .andReturn(100L).atLeastOnce();
+
+    
EasyMock.expect(context.getTableConfiguration(anyObject())).andReturn(tableConf).atLeastOnce();
+
     FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
 
     CompactorGroupId cgid = CompactorGroupId.of("G1");
@@ -445,7 +472,7 @@ public class CompactionCoordinatorTest {
     CompactionMetadata cm2 = new CompactionMetadata(Set.of(file3), tmp2, 
"localhost:5555",
         CompactionKind.USER, (short) 5, cgid, false, fateId1);
 
-    EasyMock.replay(context);
+    EasyMock.replay(context, tableConf);
 
     KeyExtent extent1 = new KeyExtent(tableId1, null, null);
 
@@ -454,36 +481,43 @@ public class CompactionCoordinatorTest {
     var cid1 = ExternalCompactionId.generate(UUID.randomUUID());
     var cid2 = ExternalCompactionId.generate(UUID.randomUUID());
 
-    var selected = new SelectedFiles(Set.of(file1, file2, file3), false, 
fateId1);
+    var selectedWithoutComp = new SelectedFiles(Set.of(file1, file2, file3), 
false, fateId1,
+        SteadyTime.from(100, TimeUnit.SECONDS));
+    var selectedWithComp = new SelectedFiles(Set.of(file1, file2, file3), 
false, fateId1, 1,
+        SteadyTime.from(100, TimeUnit.SECONDS));
+
+    var time = SteadyTime.from(1000, TimeUnit.SECONDS);
 
     // should not be able to compact an offline table
     var tabletOffline = TabletMetadata.builder(new KeyExtent(tableId2, null, 
null))
         .putFile(file1, dfv).putFile(file2, dfv).putFile(file3, 
dfv).putFile(file4, dfv)
         .build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED);
-    assertFalse(
-        canReserveCompaction(tabletOffline, CompactionKind.SYSTEM, 
Set.of(file1, file2), context));
+    assertFalse(canReserveCompaction(tabletOffline, CompactionKind.SYSTEM, 
Set.of(file1, file2),
+        context, time));
 
     // nothing should prevent this compaction
     var tablet1 =
         TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, 
dfv).putFile(file3, dfv)
             .putFile(file4, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, 
SELECTED);
-    assertTrue(canReserveCompaction(tablet1, CompactionKind.SYSTEM, 
Set.of(file1, file2), context));
+    assertTrue(
+        canReserveCompaction(tablet1, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
 
     // should not be able to do a user compaction unless selected files are 
present
-    assertFalse(canReserveCompaction(tablet1, CompactionKind.USER, 
Set.of(file1, file2), context));
+    assertFalse(
+        canReserveCompaction(tablet1, CompactionKind.USER, Set.of(file1, 
file2), context, time));
 
     // should not be able to compact a tablet with user compaction request in 
place
     var tablet3 =
         TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, 
dfv).putFile(file3, dfv)
             .putFile(file4, 
dfv).putUserCompactionRequested(fateId1).build(OPID, ECOMP, SELECTED);
     assertFalse(
-        canReserveCompaction(tablet3, CompactionKind.SYSTEM, Set.of(file1, 
file2), context));
+        canReserveCompaction(tablet3, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
 
     // should not be able to compact a tablet when the job has files not 
present in the tablet
     var tablet4 = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
         .putFile(file3, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, 
SELECTED);
-    assertFalse(
-        canReserveCompaction(tablet4, CompactionKind.SYSTEM, Set.of(file1, 
file2, file4), context));
+    assertFalse(canReserveCompaction(tablet4, CompactionKind.SYSTEM, 
Set.of(file1, file2, file4),
+        context, time));
 
     // should not be able to compact a tablet with an operation id present
     TabletOperationId opid = 
TabletOperationId.from(TabletOperationType.SPLITTING, fateId1);
@@ -491,64 +525,81 @@ public class CompactionCoordinatorTest {
         .putFile(file3, dfv).putFile(file4, dfv).putOperation(opid)
         .build(ECOMP, USER_COMPACTION_REQUESTED, SELECTED);
     assertFalse(
-        canReserveCompaction(tablet5, CompactionKind.SYSTEM, Set.of(file1, 
file2), context));
+        canReserveCompaction(tablet5, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
 
     // should not be able to compact a tablet if the job files overlaps with 
running compactions
     var tablet6 = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
         .putFile(file3, dfv).putFile(file4, dfv).putExternalCompaction(cid1, 
cm1)
         .putExternalCompaction(cid2, cm2).build(OPID, 
USER_COMPACTION_REQUESTED, SELECTED);
     assertFalse(
-        canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file1, 
file2), context));
+        canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
     // should be able to compact the file that is outside of the set of files 
currently compacting
-    assertTrue(canReserveCompaction(tablet6, CompactionKind.SYSTEM, 
Set.of(file4), context));
+    assertTrue(canReserveCompaction(tablet6, CompactionKind.SYSTEM, 
Set.of(file4), context, time));
 
     // create a tablet with a selected set of files
-    var selTablet = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
-        .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selected)
+    var selTabletWithComp = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
+        .putFile(file3, dfv).putFile(file4, 
dfv).putSelectedFiles(selectedWithComp)
         .build(OPID, USER_COMPACTION_REQUESTED, ECOMP);
+    // 0 completed jobs
+    var selTabletWithoutComp = TabletMetadata.builder(extent1).putFile(file1, 
dfv)
+        .putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv)
+        .putSelectedFiles(selectedWithoutComp).build(OPID, 
USER_COMPACTION_REQUESTED, ECOMP);
+
+    // Should be able to start if no completed and overlap
+    assertTrue(canReserveCompaction(selTabletWithoutComp, 
CompactionKind.SYSTEM,
+        Set.of(file1, file2), context, time));
+    assertTrue(canReserveCompaction(selTabletWithoutComp, 
CompactionKind.SYSTEM,
+        Set.of(file3, file4), context, time));
+
     // should not be able to start a system compaction if the set of files 
overlaps with the
     // selected files
-    assertFalse(
-        canReserveCompaction(selTablet, CompactionKind.SYSTEM, Set.of(file1, 
file2), context));
-    assertFalse(
-        canReserveCompaction(selTablet, CompactionKind.SYSTEM, Set.of(file3, 
file4), context));
+    assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, 
Set.of(file1, file2),
+        context, time));
+    assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, 
Set.of(file3, file4),
+        context, time));
     // should be able to start a system compaction on the set of files not in 
the selected set
-    assertTrue(canReserveCompaction(selTablet, CompactionKind.SYSTEM, 
Set.of(file4), context));
+    assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, 
Set.of(file4),
+        context, time));
     // should be able to start user compactions on files that are selected
-    assertTrue(canReserveCompaction(selTablet, CompactionKind.USER, 
Set.of(file1, file2), context));
-    assertTrue(canReserveCompaction(selTablet, CompactionKind.USER, 
Set.of(file2, file3), context));
-    assertTrue(
-        canReserveCompaction(selTablet, CompactionKind.USER, Set.of(file1, 
file2, file3), context));
+    assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, 
Set.of(file1, file2),
+        context, time));
+    assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, 
Set.of(file2, file3),
+        context, time));
+    assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER,
+        Set.of(file1, file2, file3), context, time));
     // should not be able to start user compactions on files that fall outside 
of the selected set
+    assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, 
Set.of(file1, file4),
+        context, time));
     assertFalse(
-        canReserveCompaction(selTablet, CompactionKind.USER, Set.of(file1, 
file4), context));
-    assertFalse(canReserveCompaction(selTablet, CompactionKind.USER, 
Set.of(file4), context));
-    assertFalse(canReserveCompaction(selTablet, CompactionKind.USER,
-        Set.of(file1, file2, file3, file4), context));
+        canReserveCompaction(selTabletWithComp, CompactionKind.USER, 
Set.of(file4), context, time));
+    assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER,
+        Set.of(file1, file2, file3, file4), context, time));
 
     // test selected files and running compaction
     var selRunningTablet = TabletMetadata.builder(extent1).putFile(file1, 
dfv).putFile(file2, dfv)
-        .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selected)
+        .putFile(file3, dfv).putFile(file4, 
dfv).putSelectedFiles(selectedWithComp)
         .putExternalCompaction(cid2, cm2).build(OPID, 
USER_COMPACTION_REQUESTED);
     // should be able to compact files that are in the selected set and not in 
the running set
-    assertTrue(
-        canReserveCompaction(selRunningTablet, CompactionKind.USER, 
Set.of(file1, file2), context));
+    assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.USER, 
Set.of(file1, file2),
+        context, time));
     // should not be able to compact because files overlap the running set
-    assertFalse(
-        canReserveCompaction(selRunningTablet, CompactionKind.USER, 
Set.of(file2, file3), context));
+    assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.USER, 
Set.of(file2, file3),
+        context, time));
     // should not be able to start a system compaction if the set of files 
overlaps with the
     // selected files and/or the running set
     assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, 
Set.of(file1, file2),
-        context));
+        context, time));
     assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, 
Set.of(file3, file4),
-        context));
+        context, time));
     // should be able to start a system compaction on the set of files not in 
the selected set
-    assertTrue(
-        canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, 
Set.of(file4), context));
+    assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, 
Set.of(file4), context,
+        time));
 
     // should not be able to compact a tablet that does not exists
-    assertFalse(canReserveCompaction(null, CompactionKind.SYSTEM, 
Set.of(file1, file2), context));
-    assertFalse(canReserveCompaction(null, CompactionKind.USER, Set.of(file1, 
file2), context));
+    assertFalse(
+        canReserveCompaction(null, CompactionKind.SYSTEM, Set.of(file1, 
file2), context, time));
+    assertFalse(
+        canReserveCompaction(null, CompactionKind.USER, Set.of(file1, file2), 
context, time));
 
     EasyMock.verify(context);
   }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
index 8bd5120788..fb09876ab4 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
@@ -52,6 +52,7 @@ import java.util.Map;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.accumulo.core.client.admin.TabletAvailability;
@@ -79,6 +80,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.gc.AllVolumesDirectory;
@@ -315,7 +317,8 @@ public class MergeTabletsTest {
     testUnexpectedColumn(tmb -> tmb.putBulkFile(lodedFile.getTabletFile(), 
otherFateId),
         "has unexpected loaded column");
 
-    var selectedFiles = new SelectedFiles(Set.of(newSTF(567)), false, 
otherFateId);
+    var selectedFiles = new SelectedFiles(Set.of(newSTF(567)), false, 
otherFateId,
+        SteadyTime.from(100, TimeUnit.NANOSECONDS));
     testUnexpectedColumn(tmb -> tmb.putSelectedFiles(selectedFiles),
         "has unexpected selected file");
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index a077edf7be..1f6627a7ec 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -52,6 +52,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -96,6 +97,7 @@ import 
org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
@@ -490,8 +492,9 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
     FateInstanceType type = FateInstanceType.fromTableId(tid);
     FateId fateId1 = FateId.from(type, UUID.randomUUID());
     FateId fateId2 = FateId.from(type, UUID.randomUUID());
+    var time = SteadyTime.from(100_100, TimeUnit.NANOSECONDS);
     ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES, 
SELECTED)
-        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1))
+        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1, time))
         .submit(tm -> false);
     results = ctmi.process();
     assertEquals(Status.REJECTED, results.get(e1).getStatus());
@@ -503,7 +506,7 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
         .build(SELECTED);
     ctmi = new ConditionalTabletsMutatorImpl(context);
     ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, FILES, 
SELECTED)
-        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1))
+        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1, time))
         .submit(tm -> false);
     results = ctmi.process();
     assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
@@ -516,10 +519,10 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
     // fail
     var expectedToFail = new ArrayList<SelectedFiles>();
 
-    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2), true, fateId1));
-    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3, stf4), true, 
fateId1));
-    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), false, 
fateId1));
-    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId2));
+    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2), true, fateId1, 
time));
+    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3, stf4), true, 
fateId1, time));
+    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), false, 
fateId1, time));
+    expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId2, time));
 
     for (var selectedFiles : expectedToFail) {
       var tm3 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
@@ -536,7 +539,7 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
     }
 
     var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, 
dfv).putFile(stf3, dfv)
-        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1)).build();
+        .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 
fateId1, time)).build();
     ctmi = new ConditionalTabletsMutatorImpl(context);
     ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES, 
SELECTED)
         .deleteSelectedFiles().submit(tm -> false);
@@ -565,8 +568,9 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
       final boolean initiallySelectedAll = true;
       final FateInstanceType type = FateInstanceType.fromTableId(tid);
       final FateId fateId = FateId.from(type, UUID.randomUUID());
+      final SteadyTime time = SteadyTime.from(100, TimeUnit.NANOSECONDS);
       final SelectedFiles selectedFiles =
-          new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateId);
+          new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateId, 
time);
 
       ConditionalTabletsMutatorImpl ctmi = new 
ConditionalTabletsMutatorImpl(context);
 
@@ -606,13 +610,15 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
           .sorted().collect(Collectors.toList());
 
       // verify we have the format of the json correct
-      String newJson = createSelectedFilesJson(fateId, initiallySelectedAll, 
filesPathList);
+      String newJson =
+          createSelectedFilesJson(fateId, initiallySelectedAll, filesPathList, 
0, time.getNanos());
       assertEquals(actualMetadataValue, newJson,
           "Test json should be identical to actual metadata at this point");
 
       // reverse the order of the files and create a new json
       Collections.reverse(filesPathList);
-      newJson = createSelectedFilesJson(fateId, initiallySelectedAll, 
filesPathList);
+      newJson =
+          createSelectedFilesJson(fateId, initiallySelectedAll, filesPathList, 
0, time.getNanos());
       assertNotEquals(actualMetadataValue, newJson,
           "Test json should have reverse file order of actual metadata");
 
@@ -663,10 +669,10 @@ public class AmpleConditionalWriterIT extends 
AccumuloClusterHarness {
    * </pre>
    */
   public static String createSelectedFilesJson(FateId fateId, boolean selAll,
-      Collection<String> paths) {
+      Collection<String> paths, int compJobs, long selTime) {
     String filesJsonArray = GSON.get().toJson(paths);
-    return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'files':" + 
filesJsonArray + "}")
-        .replace('\'', '\"');
+    return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'compJobs':" + 
compJobs
+        + ",'selTime':" + selTime + ",'files':" + filesJsonArray + 
"}").replace('\'', '\"');
   }
 
   @Test
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 73c50a0409..bd26f0a469 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -41,6 +41,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
@@ -86,13 +87,18 @@ import 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlan;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
 import org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner;
 import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
+import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.VerifyIngest;
 import org.apache.accumulo.test.VerifyIngest.VerifyParams;
+import org.apache.accumulo.test.compaction.CompactionExecutorIT.TestPlanner;
 import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils;
 import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
@@ -100,6 +106,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
+import org.apache.thrift.transport.TTransportException;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,6 +114,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterators;
+import com.google.common.net.HostAndPort;
 
 public class CompactionIT extends AccumuloClusterHarness {
 
@@ -218,6 +226,27 @@ public class CompactionIT extends AccumuloClusterHarness {
 
   }
 
+  public static class TestPlanner implements CompactionPlanner {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestPlanner.class);
+
+    RatioBasedCompactionPlanner p = new RatioBasedCompactionPlanner();
+
+    @Override
+    public void init(InitParameters params) {
+      p.init(params);
+    }
+
+    @Override
+    public CompactionPlan makePlan(PlanningParameters params) {
+      return () -> p.makePlan(params).getJobs().stream().map(job -> {
+        LOG.debug("Plan job priority is {}:{}", job.getKind(), 
job.getPriority());
+        return new CompactionJobImpl(
+            job.getKind() == CompactionKind.SYSTEM ? Short.MAX_VALUE : 
job.getPriority(),
+            job.getGroup(), job.getFiles(), job.getKind(), Optional.empty());
+      }).collect(toList());
+    }
+  }
+
   private static final Logger log = 
LoggerFactory.getLogger(CompactionIT.class);
 
   private static final int MAX_DATA = 1000;
@@ -1011,6 +1040,144 @@ public class CompactionIT extends 
AccumuloClusterHarness {
     ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), 
tableName);
   }
 
+  @Test
+  public void testCancelUserCompactionTimeoutExceeded() throws Exception {
+    testCancelUserCompactionTimeout(true);
+  }
+
+  @Test
+  public void testCancelUserCompactionTimeoutNotExceeded() throws Exception {
+    testCancelUserCompactionTimeout(false);
+  }
+
+  private void testCancelUserCompactionTimeout(boolean timeout) throws 
Exception {
+
+    var uniqueNames = getUniqueNames(2);
+    String table1 = uniqueNames[0];
+    String table2 = uniqueNames[1];
+
+    try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      // create a compaction service that uses a Planner that will schedule 
system jobs
+      // at a higher priority than user jobs
+      client.instanceOperations().setProperty(
+          Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner",
+          TestPlanner.class.getName());
+      client.instanceOperations().setProperty(
+          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"testcancel.planner.opts.groups",
+          ("[{'group':'" + COMPACTOR_GROUP_1 + "'}]").replaceAll("'", "\""));
+
+      // create two tables that uses the compaction service
+      Map<String,String> props = new HashMap<>();
+      props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
+          SimpleCompactionDispatcher.class.getName());
+      props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "testcancel");
+      // Disable system compactions to start for these tables
+      props.put(Property.TABLE_MAJC_RATIO.getKey(), "20");
+
+      // configure tablet compaction iterator that slows compaction down
+      var ntc = new NewTableConfiguration();
+      IteratorSetting iterSetting = new IteratorSetting(50, 
SlowIterator.class);
+      SlowIterator.setSleepTime(iterSetting, 5);
+      ntc.attachIterator(iterSetting, EnumSet.of(IteratorScope.majc));
+      ntc.setProperties(props);
+
+      // Create two tables and write some data
+      client.tableOperations().create(table1, ntc);
+      client.tableOperations().create(table2, ntc);
+      writeRows((ClientContext) client, table1, MAX_DATA, true);
+      writeRows((ClientContext) client, table2, MAX_DATA, true);
+
+      var ctx = getCluster().getServerContext();
+      Optional<HostAndPort> coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(ctx);
+      if (coordinatorHost.isEmpty()) {
+        throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
+      }
+
+      // Start a compaction for table2, this is done so that the compactor 
will be busy
+      // and new jobs will queue up and wait
+      client.tableOperations().compact(table2, new 
CompactionConfig().setWait(false));
+
+      var tableId = 
TableId.of(client.tableOperations().tableIdMap().get(table1));
+      var extent = new KeyExtent(tableId, null, null);
+
+      // If timeout is true then set a short timeout so the system job can 
cancel the user job
+      // Otherwise the long timeout should prevent the system from clearing 
the selected files
+      var expiration = timeout ? "100ms" : "100s";
+      client.tableOperations().setProperty(table1,
+          Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey(), expiration);
+
+      // Submit a user job for table1 that will be put on the queue and waiting
+      // for the current job to finish
+      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(false));
+      // Wait for the fate operation to write selectedFiles
+      Wait.waitFor(() -> {
+        var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
+        var selectedFiles = tabletMeta.getSelectedFiles();
+        if (selectedFiles != null) {
+          return !selectedFiles.getFiles().isEmpty();
+        }
+        return false;
+      }, Wait.MAX_WAIT_MILLIS, 10);
+
+      // Change the ratio so a system compaction will attempt to be scheduled 
for table 1
+      client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.getKey(), "1");
+
+      if (timeout) {
+        // Because of the custom planner, the system compaction should now 
take priority
+        // System compactions were previously not eligible to run if 
selectedFiles existed
+        // for a user compaction already (and they overlapped). But now system 
compaction jobs
+        // are eligible to run if the user compaction has not started or 
completed any jobs
+        // and the expiration period has been exceeded.
+        // When this happens the system compaction will delete the 
selectedFiles column
+        Wait.waitFor(() -> {
+          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
+          return tabletMeta.getSelectedFiles() == null;
+        }, Wait.MAX_WAIT_MILLIS, 100);
+
+        // Wait for the system compaction to be running
+        Wait.waitFor(() -> {
+          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
+          var externalCompactions = tabletMeta.getExternalCompactions();
+          assertTrue(externalCompactions.values().stream()
+              .allMatch(ec -> ec.getKind() == CompactionKind.SYSTEM));
+          return externalCompactions.size() == 1;
+        }, Wait.MAX_WAIT_MILLIS, 10);
+
+        // Wait for the user compaction to now run after the system finishes
+        Wait.waitFor(() -> {
+          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
+          var externalCompactions = tabletMeta.getExternalCompactions();
+          var running = externalCompactions.values().stream()
+              .filter(ec -> ec.getKind() == CompactionKind.USER).count();
+          return running == 1;
+        }, Wait.MAX_WAIT_MILLIS, 100);
+      } else {
+        // Wait for the user compaction to run, there should no system 
compactions scheduled
+        // even though system has the higher priority in the test because the 
timeout was
+        // not exceeded
+        Wait.waitFor(() -> {
+          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
+          var externalCompactions = tabletMeta.getExternalCompactions();
+          assertTrue(externalCompactions.values().stream()
+              .allMatch(ec -> ec.getKind() == CompactionKind.USER));
+          return externalCompactions.size() == 1;
+        }, Wait.MAX_WAIT_MILLIS, 10);
+      }
+
+      // Wait and verify all compactions finish
+      Wait.waitFor(() -> {
+        var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
+        var externalCompactions = tabletMeta.getExternalCompactions().size();
+        log.debug("Waiting for compactions to finish, count {}", 
externalCompactions);
+        return externalCompactions == 0 && tabletMeta.getCompacted().isEmpty()
+            && tabletMeta.getSelectedFiles() == null;
+      }, Wait.MAX_WAIT_MILLIS, 100);
+    }
+
+    ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), 
table1);
+  }
+
   private void writeRows(ClientContext client, String tableName, int rows, 
boolean wait)
       throws Exception {
     try (BatchWriter bw = client.createBatchWriter(tableName)) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 7af3e0ba00..fc6d42fe35 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Accumulo;
@@ -89,6 +90,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.state.TabletManagementIterator;
@@ -589,6 +591,7 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
         onlineTables,
         new LiveTServerSet.LiveTServersSnapshot(tservers,
             Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)),
-        Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, 
replacements);
+        Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements,
+        SteadyTime.from(10000, TimeUnit.NANOSECONDS));
   }
 }

Reply via email to