This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 9d4dc21abc Allow system compactions to run if zero user compaction jobs have run (#4480) 9d4dc21abc is described below commit 9d4dc21abce1e55daf21f06fdf87f1cffac743ef Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sat May 18 12:00:12 2024 -0400 Allow system compactions to run if zero user compaction jobs have run (#4480) This change will allow system compactions to postpone user compactions that have had no jobs run yet. Before this, if a user compaction was in the queue and had selected files that overlapped it would block system compactions from running. Now if there are selected files, but the user compaction is not running and hasn't had any jobs completed, the coordinator will clear the selectedFiles column so that the system compaction can run if the expiration time has passed. The fate operation will reset the column again while trying to make progress. This closes #4454 Co-authored-by: Keith Turner <ktur...@apache.org> --- .../core/metadata/schema/SelectedFiles.java | 57 +++++-- .../core/metadata/schema/SelectedFilesTest.java | 47 +++--- .../core/metadata/schema/TabletMetadataTest.java | 3 +- .../server/compaction/CompactionJobGenerator.java | 25 ++- .../manager/state/TabletManagementIterator.java | 4 +- .../manager/state/TabletManagementParameters.java | 14 +- .../constraints/MetadataConstraintsTest.java | 3 +- .../state/TabletManagementParametersTest.java | 6 +- .../java/org/apache/accumulo/manager/Manager.java | 2 +- .../accumulo/manager/TabletGroupWatcher.java | 8 +- .../coordinator/CompactionCoordinator.java | 50 +++++- .../coordinator/commit/CommitCompaction.java | 7 +- .../manager/tableOps/compact/CompactionDriver.java | 15 +- .../compaction/CompactionCoordinatorTest.java | 151 +++++++++++++------ .../manager/tableOps/merge/MergeTabletsTest.java | 5 +- .../test/functional/AmpleConditionalWriterIT.java | 32 ++-- .../accumulo/test/functional/CompactionIT.java | 167 +++++++++++++++++++++ .../functional/TabletManagementIteratorIT.java | 5 +- 18 files changed, 483 insertions(+), 118 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java index f2dd1ad861..7cb4c3277e 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java @@ -24,10 +24,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.util.time.SteadyTime; import com.google.common.base.Preconditions; import com.google.gson.Gson; @@ -44,27 +46,48 @@ public class SelectedFiles { private final Set<StoredTabletFile> files; private final boolean initiallySelectedAll; private final FateId fateId; + private final int completedJobs; + private final SteadyTime selectedTime; private String metadataValue; private static final Gson GSON = new GsonBuilder() .registerTypeAdapter(SelectedFiles.class, new SelectedFilesTypeAdapter()).create(); - public SelectedFiles(Set<StoredTabletFile> files, boolean initiallySelectedAll, FateId fateId) { + public SelectedFiles(Set<StoredTabletFile> files, boolean initiallySelectedAll, FateId fateId, + SteadyTime selectedTime) { + this(files, initiallySelectedAll, fateId, 0, selectedTime); + } + + public SelectedFiles(Set<StoredTabletFile> files, boolean initiallySelectedAll, FateId fateId, + int completedJobs, SteadyTime selectedTime) { Preconditions.checkArgument(files != null && !files.isEmpty()); + Preconditions.checkArgument(completedJobs >= 0); this.files = Set.copyOf(files); this.initiallySelectedAll = initiallySelectedAll; - this.fateId = fateId; + this.fateId = Objects.requireNonNull(fateId); + this.completedJobs = completedJobs; + this.selectedTime = Objects.requireNonNull(selectedTime); } private static class SelectedFilesTypeAdapter extends TypeAdapter<SelectedFiles> { + // These fields could be moved to an enum but for now just using static Strings + // seems better to avoid having to construct an enum each time the string is read + private static final String FATE_ID = "fateId"; + private static final String SELECTED_ALL = "selAll"; + private static final String COMPLETED_JOBS = "compJobs"; + private static final String FILES = "files"; + private static final String SELECTED_TIME_NANOS = "selTimeNanos"; + @Override public void write(JsonWriter out, SelectedFiles selectedFiles) throws IOException { out.beginObject(); - out.name("fateId").value(selectedFiles.getFateId().canonical()); - out.name("selAll").value(selectedFiles.initiallySelectedAll()); - out.name("files").beginArray(); + out.name(FATE_ID).value(selectedFiles.getFateId().canonical()); + out.name(SELECTED_ALL).value(selectedFiles.initiallySelectedAll()); + out.name(COMPLETED_JOBS).value(selectedFiles.getCompletedJobs()); + out.name(SELECTED_TIME_NANOS).value(selectedFiles.getSelectedTime().getNanos()); + out.name(FILES).beginArray(); // sort the data to make serialized json comparable selectedFiles.getFiles().stream().map(StoredTabletFile::getMetadata).sorted() .forEach(file -> { @@ -83,19 +106,27 @@ public class SelectedFiles { public SelectedFiles read(JsonReader in) throws IOException { FateId fateId = null; boolean selAll = false; + int completedJobs = 0; List<String> files = new ArrayList<>(); + SteadyTime selectedTime = null; in.beginObject(); while (in.hasNext()) { String name = in.nextName(); switch (name) { - case "fateId": + case FATE_ID: fateId = FateId.from(in.nextString()); break; - case "selAll": + case SELECTED_ALL: selAll = in.nextBoolean(); break; - case "files": + case COMPLETED_JOBS: + completedJobs = in.nextInt(); + break; + case SELECTED_TIME_NANOS: + selectedTime = SteadyTime.from(in.nextLong(), TimeUnit.NANOSECONDS); + break; + case FILES: in.beginArray(); while (in.hasNext()) { files.add(in.nextString()); @@ -111,7 +142,7 @@ public class SelectedFiles { Set<StoredTabletFile> tabletFiles = files.stream().map(StoredTabletFile::new).collect(Collectors.toSet()); - return new SelectedFiles(tabletFiles, selAll, fateId); + return new SelectedFiles(tabletFiles, selAll, fateId, completedJobs, selectedTime); } } @@ -132,6 +163,14 @@ public class SelectedFiles { return fateId; } + public int getCompletedJobs() { + return completedJobs; + } + + public SteadyTime getSelectedTime() { + return selectedTime; + } + public String getMetadataValue() { if (this.metadataValue == null) { // use the custom TypeAdapter to create the json diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java index a8bd1e67da..974465c529 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -37,6 +38,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -56,7 +58,8 @@ public class SelectedFilesTest { Set<StoredTabletFile> files = getStoredTabletFiles(2); FateId fateId = FateId.from(FateInstanceType.META, UUID.randomUUID()); - SelectedFiles original = new SelectedFiles(files, true, fateId); + SelectedFiles original = + new SelectedFiles(files, true, fateId, SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); String json = original.getMetadataValue(); SelectedFiles deserialized = SelectedFiles.from(json); @@ -73,8 +76,10 @@ public class SelectedFilesTest { Set<StoredTabletFile> files = getStoredTabletFiles(16); FateId fateId = FateId.from(FateInstanceType.META, UUID.randomUUID()); - SelectedFiles sf1 = new SelectedFiles(files, true, fateId); - SelectedFiles sf2 = new SelectedFiles(files, true, fateId); + SelectedFiles sf1 = + new SelectedFiles(files, true, fateId, SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); + SelectedFiles sf2 = + new SelectedFiles(files, true, fateId, SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); assertEquals(sf1.getMetadataValue(), sf2.getMetadataValue()); assertEquals(sf1, sf2); @@ -94,8 +99,10 @@ public class SelectedFilesTest { assertNotEquals(files.toString(), sortedFiles.toString(), "Order of files set should differ for this test case"); - SelectedFiles sf1 = new SelectedFiles(files, false, fateId); - SelectedFiles sf2 = new SelectedFiles(sortedFiles, false, fateId); + SelectedFiles sf1 = + new SelectedFiles(files, false, fateId, SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); + SelectedFiles sf2 = new SelectedFiles(sortedFiles, false, fateId, + SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); assertEquals(sf1.getMetadataValue(), sf2.getMetadataValue()); assertEquals(sf1, sf2); @@ -113,8 +120,10 @@ public class SelectedFilesTest { // Remove an element to create a subset filesSubSet.remove(filesSubSet.iterator().next()); - SelectedFiles superSetSelectedFiles = new SelectedFiles(filesSuperSet, true, fateId); - SelectedFiles subSetSelectedFiles = new SelectedFiles(filesSubSet, true, fateId); + SelectedFiles superSetSelectedFiles = new SelectedFiles(filesSuperSet, true, fateId, + SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); + SelectedFiles subSetSelectedFiles = new SelectedFiles(filesSubSet, true, fateId, + SteadyTime.from(100_100, TimeUnit.NANOSECONDS)); String superSetJson = superSetSelectedFiles.getMetadataValue(); String subSetJson = subSetSelectedFiles.getMetadataValue(); @@ -134,11 +143,12 @@ public class SelectedFilesTest { } private static Stream<Arguments> provideTestJsons() { - return Stream.of(Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", true, 12), - Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", false, 12), - Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", false, 23), - Arguments.of("FATE:META:abcdef12-3456-789a-bcde-f123456789ab", false, 23), - Arguments.of("FATE:META:41b40c7c-55e5-4d3b-8d21-1b70d1e7f3fb", false, 23)); + return Stream.of( + Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", true, 0, 1000, 12), + Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", false, 1, 2000, 12), + Arguments.of("FATE:META:12345678-9abc-def1-2345-6789abcdef12", false, 2, 3000, 23), + Arguments.of("FATE:META:abcdef12-3456-789a-bcde-f123456789ab", false, 2, 4000, 23), + Arguments.of("FATE:META:41b40c7c-55e5-4d3b-8d21-1b70d1e7f3fb", false, 2, 5000, 23)); } /** @@ -147,14 +157,15 @@ public class SelectedFilesTest { */ @ParameterizedTest @MethodSource("provideTestJsons") - public void testJsonStrings(FateId fateId, boolean selAll, int numPaths) { + public void testJsonStrings(FateId fateId, boolean selAll, int compJobs, long selTimeNanos, + int numPaths) { List<String> paths = getFilePaths(numPaths); // should be resilient to unordered file arrays Collections.shuffle(paths, RANDOM.get()); // construct a json from the given parameters - String json = getJson(fateId, selAll, paths); + String json = getJson(fateId, selAll, compJobs, selTimeNanos, paths); System.out.println(json); @@ -168,7 +179,7 @@ public class SelectedFilesTest { assertEquals(expectedStoredTabletFiles, selectedFiles.getFiles()); Collections.sort(paths); - String jsonWithSortedFiles = getJson(fateId, selAll, paths); + String jsonWithSortedFiles = getJson(fateId, selAll, compJobs, selTimeNanos, paths); assertEquals(jsonWithSortedFiles, selectedFiles.getMetadataValue()); } @@ -184,12 +195,14 @@ public class SelectedFilesTest { * } * </pre> */ - private static String getJson(FateId fateId, boolean selAll, List<String> paths) { + private static String getJson(FateId fateId, boolean selAll, int compJobs, long selTimeNanos, + List<String> paths) { String filesJsonArray = paths.stream().map(path -> new ReferencedTabletFile(new Path(path)).insert().getMetadata()) .map(path -> path.replace("\"", "\\\"")).map(path -> "'" + path + "'") .collect(Collectors.joining(",")); - return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'files':[" + filesJsonArray + "]}") + return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'compJobs':" + compJobs + + ",'selTimeNanos':" + selTimeNanos + ",'files':[" + filesJsonArray + "]}") .replace('\'', '\"'); } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 109be1aadc..0f51d61283 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -667,7 +667,8 @@ public class TabletMetadataTest { LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); FateId selFilesFateId = FateId.from(type, UUID.randomUUID()); - SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, selFilesFateId); + SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, selFilesFateId, + SteadyTime.from(100_000, TimeUnit.NANOSECONDS)); var unsplittableMeta = UnSplittableMetadata.toUnSplittable(extent, 100, 110, 120, Set.of(sf1, sf2)); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index 1d88de2eaa..3b20c1ef99 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateId; @@ -51,6 +52,7 @@ import org.apache.accumulo.core.util.compaction.CompactionJobImpl; import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; +import org.apache.accumulo.core.util.time.SteadyTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,9 +67,10 @@ public class CompactionJobGenerator { private final PluginEnvironment env; private final Map<FateId,Map<String,String>> allExecutionHints; private final Cache<Pair<TableId,CompactionServiceId>,Long> unknownCompactionServiceErrorCache; + private final SteadyTime steadyTime; public CompactionJobGenerator(PluginEnvironment env, - Map<FateId,Map<String,String>> executionHints) { + Map<FateId,Map<String,String>> executionHints, SteadyTime steadyTime) { servicesConfig = new CompactionServicesConfig(env.getConfiguration()); serviceIds = servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of) .collect(Collectors.toUnmodifiableSet()); @@ -87,6 +90,7 @@ public class CompactionJobGenerator { unknownCompactionServiceErrorCache = Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN, false) .expireAfterWrite(5, TimeUnit.MINUTES).build(); + this.steadyTime = steadyTime; } public Collection<CompactionJob> generateJobs(TabletMetadata tablet, Set<CompactionKind> kinds) { @@ -205,9 +209,22 @@ public class CompactionJobGenerator { // remove any files that are in active compactions tablet.getExternalCompactions().values().stream().flatMap(ecm -> ecm.getJobFiles().stream()) .forEach(tmpFiles::remove); - // remove any files that are selected - if (tablet.getSelectedFiles() != null) { - tmpFiles.keySet().removeAll(tablet.getSelectedFiles().getFiles()); + // remove any files that are selected and the user compaction has completed + // at least 1 job, otherwise we can keep the files + var selectedFiles = tablet.getSelectedFiles(); + + if (selectedFiles != null) { + long selectedExpirationDuration = + ConfigurationTypeHelper.getTimeInMillis(env.getConfiguration(tablet.getTableId()) + .get(Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey())); + + // If jobs are completed, or selected time has not expired, the remove + // from the candidate list otherwise we can cancel the selection + if (selectedFiles.getCompletedJobs() > 0 + || (steadyTime.minus(selectedFiles.getSelectedTime()).toMillis() + < selectedExpirationDuration)) { + tmpFiles.keySet().removeAll(selectedFiles.getFiles()); + } } candidates = tmpFiles.entrySet().stream() .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 4b2ba5d4f7..b9c64c7f84 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -160,8 +160,8 @@ public class TabletManagementIterator extends SkippingIterator { this.env = env; tabletMgmtParams = TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION)); - compactionGenerator = - new CompactionJobGenerator(env.getPluginEnv(), tabletMgmtParams.getCompactionHints()); + compactionGenerator = new CompactionJobGenerator(env.getPluginEnv(), + tabletMgmtParams.getCompactionHints(), tabletMgmtParams.getSteadyTime()); final AccumuloConfiguration conf = new ConfigurationCopy(env.getPluginEnv().getConfiguration()); BalancerEnvironmentImpl benv = new BalancerEnvironmentImpl(((TabletIteratorEnvironment) env).getServerContext()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java index 197b9feca6..a407a20b6e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java @@ -32,7 +32,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -43,6 +45,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; @@ -75,13 +78,14 @@ public class TabletManagementParameters { private final Set<TServerInstance> onlineTservers; private final boolean canSuspendTablets; private final Map<Path,Path> volumeReplacements; + private final SteadyTime steadyTime; public TabletManagementParameters(ManagerState managerState, Map<Ample.DataLevel,Boolean> parentUpgradeMap, Set<TableId> onlineTables, LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot, Set<TServerInstance> serversToShutdown, Map<KeyExtent,TServerInstance> migrations, Ample.DataLevel level, Map<FateId,Map<String,String>> compactionHints, - boolean canSuspendTablets, Map<Path,Path> volumeReplacements) { + boolean canSuspendTablets, Map<Path,Path> volumeReplacements, SteadyTime steadyTime) { this.managerState = managerState; this.parentUpgradeMap = Map.copyOf(parentUpgradeMap); // TODO could filter by level @@ -103,6 +107,7 @@ public class TabletManagementParameters { }); this.canSuspendTablets = canSuspendTablets; this.volumeReplacements = Map.copyOf(volumeReplacements); + this.steadyTime = Objects.requireNonNull(steadyTime); } private TabletManagementParameters(JsonData jdata) { @@ -130,6 +135,7 @@ public class TabletManagementParameters { }); this.canSuspendTablets = jdata.canSuspendTablets; this.volumeReplacements = Collections.unmodifiableMap(jdata.volumeReplacements); + this.steadyTime = SteadyTime.from(jdata.steadyTime, TimeUnit.NANOSECONDS); } public ManagerState getManagerState() { @@ -188,6 +194,10 @@ public class TabletManagementParameters { return volumeReplacements; } + public SteadyTime getSteadyTime() { + return steadyTime; + } + private static Map<FateId,Map<String,String>> makeImmutable(Map<FateId,Map<String,String>> compactionHints) { var copy = new HashMap<FateId,Map<String,String>>(); @@ -212,6 +222,7 @@ public class TabletManagementParameters { final boolean canSuspendTablets; final Map<Path,Path> volumeReplacements; + final long steadyTime; private static String toString(KeyExtent extent) { DataOutputBuffer buffer = new DataOutputBuffer(); @@ -255,6 +266,7 @@ public class TabletManagementParameters { .collect(Collectors.toMap(entry -> entry.getKey().canonical(), Map.Entry::getValue)); canSuspendTablets = params.canSuspendTablets; volumeReplacements = params.volumeReplacements; + steadyTime = params.steadyTime.getNanos(); } } diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index 71e1ada06e..b2aebb5b36 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -526,7 +526,8 @@ public class MetadataConstraintsTest { ServerColumnFamily.SELECTED_COLUMN.put(m, new Value(new SelectedFiles(Set.of(new ReferencedTabletFile( new Path("hdfs://nn.somewhere.com:86753/accumulo/tables/42/t-0000/F00001.rf")) - .insert()), true, fateId).getMetadataValue())); + .insert()), true, fateId, SteadyTime.from(100, TimeUnit.NANOSECONDS)) + .getMetadataValue())); violations = mc.check(createEnv(), m); assertNull(violations); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java index ff6a2fc940..fc9fe4e9d5 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.TableId; @@ -30,6 +31,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; @@ -55,10 +57,11 @@ public class TabletManagementParametersTest { final boolean canSuspendTablets = true; final Map<Path,Path> replacements = Map.of(new Path("file:/vol1/accumulo/inst_id"), new Path("file:/vol2/accumulo/inst_id")); + final SteadyTime steadyTime = SteadyTime.from(100_000, TimeUnit.NANOSECONDS); final TabletManagementParameters tmp = new TabletManagementParameters(managerState, parentUpgradeMap, onlineTables, serverSnapshot, serversToShutdown, migrations, dataLevel, - compactionHints, canSuspendTablets, replacements); + compactionHints, canSuspendTablets, replacements, steadyTime); String jsonString = tmp.serialize(); TabletManagementParameters tmp2 = TabletManagementParameters.deserialize(jsonString); @@ -73,5 +76,6 @@ public class TabletManagementParametersTest { assertEquals(compactionHints, tmp2.getCompactionHints()); assertEquals(canSuspendTablets, tmp2.canSuspendTablets()); assertEquals(replacements, tmp2.getVolumeReplacements()); + assertEquals(steadyTime, tmp2.getSteadyTime()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index ac9b682cb9..244a2696ab 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -995,7 +995,7 @@ public class Manager extends AbstractServer fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); compactionCoordinator = - new CompactionCoordinator(context, security, fateRefs, getResourceGroup()); + new CompactionCoordinator(context, security, fateRefs, getResourceGroup(), this); // Start the Manager's Client service // Ensure that calls before the manager gets the lock fail diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 7ab6415646..ee9765138c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -347,7 +347,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { manager.onlineTables(), tServersSnapshot, shutdownServers, manager.migrationsSnapshot(), store.getLevel(), manager.getCompactionHints(store.getLevel()), canSuspendTablets(), lookForTabletsNeedingVolReplacement ? manager.getContext().getVolumeReplacements() - : Map.of()); + : Map.of(), + manager.getSteadyTime()); } private Set<TServerInstance> getFilteredServersToShutdown() { @@ -382,8 +383,9 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { TabletLists tLists = new TabletLists(currentTServers, tableMgmtParams.getGroupedTServers(), tableMgmtParams.getServersToShutdown()); - CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( - new ServiceEnvironmentImpl(manager.getContext()), tableMgmtParams.getCompactionHints()); + CompactionJobGenerator compactionGenerator = + new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext()), + tableMgmtParams.getCompactionHints(), tableMgmtParams.getSteadyTime()); Set<TServerInstance> filteredServersToShutdown = new HashSet<>(tableMgmtParams.getServersToShutdown()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 8fcddc530f..bce38de18d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -40,6 +40,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -109,6 +110,7 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; @@ -175,14 +177,16 @@ public class CompactionCoordinator private final DeadCompactionDetector deadCompactionDetector; private QueueMetrics queueMetrics; + private final Manager manager; public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, - final String resourceGroupName) { + final String resourceGroupName, Manager manager) { this.ctx = ctx; this.schedExecutor = this.ctx.getScheduledExecutor(); this.security = security; this.resourceGroupName = resourceGroupName; + this.manager = Objects.requireNonNull(manager); this.jobQueues = new CompactionJobQueues( ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); @@ -423,7 +427,7 @@ public class CompactionCoordinator @VisibleForTesting public static boolean canReserveCompaction(TabletMetadata tablet, CompactionKind kind, - Set<StoredTabletFile> jobFiles, ServerContext ctx) { + Set<StoredTabletFile> jobFiles, ServerContext ctx, SteadyTime steadyTime) { if (tablet == null) { // the tablet no longer exist @@ -457,8 +461,8 @@ public class CompactionCoordinator "Unable to reserve {} for system compaction, tablet has {} pending requested user compactions", tablet.getExtent(), userRequestedCompactions); return false; - } else if (tablet.getSelectedFiles() != null - && !Collections.disjoint(jobFiles, tablet.getSelectedFiles().getFiles())) { + } else if (!Collections.disjoint(jobFiles, + getFilesReservedBySelection(tablet, steadyTime, ctx))) { return false; } break; @@ -549,7 +553,8 @@ public class CompactionCoordinator try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { var extent = metaJob.getTabletMetadata().getExtent(); - if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx)) { + if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx, + manager.getSteadyTime())) { return null; } @@ -561,6 +566,21 @@ public class CompactionCoordinator var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() .requireSame(tabletMetadata, FILES, SELECTED, ECOMP); + if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { + var selectedFiles = tabletMetadata.getSelectedFiles(); + var reserved = getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx); + + // If there is a selectedFiles column, and the reserved set is empty this means that + // either no user jobs were completed yet or the selection expiration time has passed + // so the column is eligible to be deleted so a system job can run instead + if (selectedFiles != null && reserved.isEmpty() + && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) { + LOG.debug("Deleting user compaction selected files for {} {}", extent, + externalCompactionId); + tabletMutator.deleteSelectedFiles(); + } + } + tabletMutator.putExternalCompaction(externalCompactionId, ecm); tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId)); @@ -1052,4 +1072,24 @@ public class CompactionCoordinator } } + private static Set<StoredTabletFile> getFilesReservedBySelection(TabletMetadata tabletMetadata, + SteadyTime steadyTime, ServerContext ctx) { + if (tabletMetadata.getSelectedFiles() == null) { + return Set.of(); + } + + if (tabletMetadata.getSelectedFiles().getCompletedJobs() > 0) { + return tabletMetadata.getSelectedFiles().getFiles(); + } + + long selectedExpirationDuration = ctx.getTableConfiguration(tabletMetadata.getTableId()) + .getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION); + + if (steadyTime.minus(tabletMetadata.getSelectedFiles().getSelectedTime()).toMillis() + < selectedExpirationDuration) { + return tabletMetadata.getSelectedFiles().getFiles(); + } + + return Set.of(); + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index 6b3db6b136..939d897980 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -197,9 +197,10 @@ public class CommitCompaction extends ManagerRepo { tablet.getExtent()); } - tabletMutator.putSelectedFiles( - new SelectedFiles(newSelectedFileSet, tablet.getSelectedFiles().initiallySelectedAll(), - tablet.getSelectedFiles().getFateId())); + tabletMutator.putSelectedFiles(new SelectedFiles(newSelectedFileSet, + tablet.getSelectedFiles().initiallySelectedAll(), tablet.getSelectedFiles().getFateId(), + tablet.getSelectedFiles().getCompletedJobs() + 1, + tablet.getSelectedFiles().getSelectedTime())); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index d9424b68fe..d1ace6c816 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -241,17 +241,22 @@ class CompactionDriver extends ManagerRepo { noneSelected++; } else { var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() - .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED); - var selectedFiles = - new SelectedFiles(filesToCompact, tablet.getFiles().equals(filesToCompact), fateId); + .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED, USER_COMPACTION_REQUESTED); + var selectedFiles = new SelectedFiles(filesToCompact, + tablet.getFiles().equals(filesToCompact), fateId, manager.getSteadyTime()); mutator.putSelectedFiles(selectedFiles); + // We no longer need to include this marker if files are selected + if (tablet.getUserCompactionsRequested().contains(fateId)) { + mutator.deleteUserCompactionRequested(fateId); + } + selectionsSubmitted.put(tablet.getExtent(), filesToCompact); mutator.submit(tabletMetadata -> tabletMetadata.getSelectedFiles() != null - && tabletMetadata.getSelectedFiles().getMetadataValue() - .equals(selectedFiles.getMetadataValue())); + && tabletMetadata.getSelectedFiles().getFateId().equals(fateId) + || tabletMetadata.getCompacted().contains(fateId)); if (minSelected == null || tablet.getExtent().compareTo(minSelected) < 0) { minSelected = tablet.getExtent(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 1c4a02a7a4..962ac18a13 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -82,6 +83,7 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.cache.Caches; import org.apache.accumulo.core.util.compaction.CompactionJobImpl; import org.apache.accumulo.core.util.compaction.RunningCompaction; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; @@ -126,8 +128,8 @@ public class CompactionCoordinatorTest { private Set<ExternalCompactionId> metadataCompactionIds = null; public TestCoordinator(ServerContext ctx, SecurityOperation security, - List<RunningCompaction> runningCompactions) { - super(ctx, security, fateInstances, "TEST_GROUP"); + List<RunningCompaction> runningCompactions, Manager manager) { + super(ctx, security, fateInstances, "TEST_GROUP", manager); this.runningCompactions = runningCompactions; } @@ -240,9 +242,13 @@ public class CompactionCoordinatorTest { AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - EasyMock.replay(context, security); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); - var coordinator = new TestCoordinator(context, security, new ArrayList<>()); + EasyMock.replay(context, security, manager); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); assertEquals(0, coordinator.getRunning().size()); coordinator.run(); @@ -273,9 +279,13 @@ public class CompactionCoordinatorTest { AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); - EasyMock.replay(context, job, security); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, job, security, manager); - var coordinator = new TestCoordinator(context, security, runningCompactions); + var coordinator = new TestCoordinator(context, security, runningCompactions, manager); coordinator.resetInternals(); assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); assertEquals(0, coordinator.getRunning().size()); @@ -320,10 +330,13 @@ public class CompactionCoordinatorTest { expect(tm.getExtent()).andReturn(ke).anyTimes(); expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes(); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); - EasyMock.replay(tconf, context, creds, tm, security); + EasyMock.replay(tconf, context, creds, tm, security, manager); - var coordinator = new TestCoordinator(context, security, new ArrayList<>()); + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); assertEquals(0, coordinator.getRunning().size()); // Use coordinator.run() to populate the internal data structures. This is tested in a different @@ -371,9 +384,13 @@ public class CompactionCoordinatorTest { AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); expect(security.canPerformSystemActions(creds)).andReturn(true); - EasyMock.replay(context, creds, security); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); + + EasyMock.replay(context, creds, security, manager); - var coordinator = new TestCoordinator(context, security, new ArrayList<>()); + var coordinator = new TestCoordinator(context, security, new ArrayList<>(), manager); TExternalCompactionJob job = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString()); assertNull(job.getExternalCompactionId()); @@ -391,10 +408,14 @@ public class CompactionCoordinatorTest { TCredentials creds = EasyMock.createNiceMock(TCredentials.class); AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + Manager manager = EasyMock.createNiceMock(Manager.class); + expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100000, TimeUnit.NANOSECONDS)) + .anyTimes(); - EasyMock.replay(context, creds, security); + EasyMock.replay(context, creds, security, manager); - TestCoordinator coordinator = new TestCoordinator(context, security, new ArrayList<>()); + TestCoordinator coordinator = + new TestCoordinator(context, security, new ArrayList<>(), manager); var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); var ecid2 = ExternalCompactionId.generate(UUID.randomUUID()); @@ -432,6 +453,12 @@ public class CompactionCoordinatorTest { EasyMock.expect(context.getTableState(tableId1)).andReturn(TableState.ONLINE).atLeastOnce(); EasyMock.expect(context.getTableState(tableId2)).andReturn(TableState.OFFLINE).atLeastOnce(); + TableConfiguration tableConf = EasyMock.createMock(TableConfiguration.class); + EasyMock.expect(tableConf.getTimeInMillis(Property.TABLE_COMPACTION_SELECTION_EXPIRATION)) + .andReturn(100L).atLeastOnce(); + + EasyMock.expect(context.getTableConfiguration(anyObject())).andReturn(tableConf).atLeastOnce(); + FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); CompactorGroupId cgid = CompactorGroupId.of("G1"); @@ -445,7 +472,7 @@ public class CompactionCoordinatorTest { CompactionMetadata cm2 = new CompactionMetadata(Set.of(file3), tmp2, "localhost:5555", CompactionKind.USER, (short) 5, cgid, false, fateId1); - EasyMock.replay(context); + EasyMock.replay(context, tableConf); KeyExtent extent1 = new KeyExtent(tableId1, null, null); @@ -454,36 +481,43 @@ public class CompactionCoordinatorTest { var cid1 = ExternalCompactionId.generate(UUID.randomUUID()); var cid2 = ExternalCompactionId.generate(UUID.randomUUID()); - var selected = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1); + var selectedWithoutComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, + SteadyTime.from(100, TimeUnit.SECONDS)); + var selectedWithComp = new SelectedFiles(Set.of(file1, file2, file3), false, fateId1, 1, + SteadyTime.from(100, TimeUnit.SECONDS)); + + var time = SteadyTime.from(1000, TimeUnit.SECONDS); // should not be able to compact an offline table var tabletOffline = TabletMetadata.builder(new KeyExtent(tableId2, null, null)) .putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) .build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); - assertFalse( - canReserveCompaction(tabletOffline, CompactionKind.SYSTEM, Set.of(file1, file2), context)); + assertFalse(canReserveCompaction(tabletOffline, CompactionKind.SYSTEM, Set.of(file1, file2), + context, time)); // nothing should prevent this compaction var tablet1 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) .putFile(file4, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); - assertTrue(canReserveCompaction(tablet1, CompactionKind.SYSTEM, Set.of(file1, file2), context)); + assertTrue( + canReserveCompaction(tablet1, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); // should not be able to do a user compaction unless selected files are present - assertFalse(canReserveCompaction(tablet1, CompactionKind.USER, Set.of(file1, file2), context)); + assertFalse( + canReserveCompaction(tablet1, CompactionKind.USER, Set.of(file1, file2), context, time)); // should not be able to compact a tablet with user compaction request in place var tablet3 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv).putFile(file3, dfv) .putFile(file4, dfv).putUserCompactionRequested(fateId1).build(OPID, ECOMP, SELECTED); assertFalse( - canReserveCompaction(tablet3, CompactionKind.SYSTEM, Set.of(file1, file2), context)); + canReserveCompaction(tablet3, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); // should not be able to compact a tablet when the job has files not present in the tablet var tablet4 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) .putFile(file3, dfv).build(OPID, ECOMP, USER_COMPACTION_REQUESTED, SELECTED); - assertFalse( - canReserveCompaction(tablet4, CompactionKind.SYSTEM, Set.of(file1, file2, file4), context)); + assertFalse(canReserveCompaction(tablet4, CompactionKind.SYSTEM, Set.of(file1, file2, file4), + context, time)); // should not be able to compact a tablet with an operation id present TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1); @@ -491,64 +525,81 @@ public class CompactionCoordinatorTest { .putFile(file3, dfv).putFile(file4, dfv).putOperation(opid) .build(ECOMP, USER_COMPACTION_REQUESTED, SELECTED); assertFalse( - canReserveCompaction(tablet5, CompactionKind.SYSTEM, Set.of(file1, file2), context)); + canReserveCompaction(tablet5, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); // should not be able to compact a tablet if the job files overlaps with running compactions var tablet6 = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) .putFile(file3, dfv).putFile(file4, dfv).putExternalCompaction(cid1, cm1) .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED, SELECTED); assertFalse( - canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file1, file2), context)); + canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); // should be able to compact the file that is outside of the set of files currently compacting - assertTrue(canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file4), context)); + assertTrue(canReserveCompaction(tablet6, CompactionKind.SYSTEM, Set.of(file4), context, time)); // create a tablet with a selected set of files - var selTablet = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) - .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selected) + var selTabletWithComp = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) + .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) .build(OPID, USER_COMPACTION_REQUESTED, ECOMP); + // 0 completed jobs + var selTabletWithoutComp = TabletMetadata.builder(extent1).putFile(file1, dfv) + .putFile(file2, dfv).putFile(file3, dfv).putFile(file4, dfv) + .putSelectedFiles(selectedWithoutComp).build(OPID, USER_COMPACTION_REQUESTED, ECOMP); + + // Should be able to start if no completed and overlap + assertTrue(canReserveCompaction(selTabletWithoutComp, CompactionKind.SYSTEM, + Set.of(file1, file2), context, time)); + assertTrue(canReserveCompaction(selTabletWithoutComp, CompactionKind.SYSTEM, + Set.of(file3, file4), context, time)); + // should not be able to start a system compaction if the set of files overlaps with the // selected files - assertFalse( - canReserveCompaction(selTablet, CompactionKind.SYSTEM, Set.of(file1, file2), context)); - assertFalse( - canReserveCompaction(selTablet, CompactionKind.SYSTEM, Set.of(file3, file4), context)); + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file1, file2), + context, time)); + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file3, file4), + context, time)); // should be able to start a system compaction on the set of files not in the selected set - assertTrue(canReserveCompaction(selTablet, CompactionKind.SYSTEM, Set.of(file4), context)); + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.SYSTEM, Set.of(file4), + context, time)); // should be able to start user compactions on files that are selected - assertTrue(canReserveCompaction(selTablet, CompactionKind.USER, Set.of(file1, file2), context)); - assertTrue(canReserveCompaction(selTablet, CompactionKind.USER, Set.of(file2, file3), context)); - assertTrue( - canReserveCompaction(selTablet, CompactionKind.USER, Set.of(file1, file2, file3), context)); + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file1, file2), + context, time)); + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file2, file3), + context, time)); + assertTrue(canReserveCompaction(selTabletWithComp, CompactionKind.USER, + Set.of(file1, file2, file3), context, time)); // should not be able to start user compactions on files that fall outside of the selected set + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file1, file4), + context, time)); assertFalse( - canReserveCompaction(selTablet, CompactionKind.USER, Set.of(file1, file4), context)); - assertFalse(canReserveCompaction(selTablet, CompactionKind.USER, Set.of(file4), context)); - assertFalse(canReserveCompaction(selTablet, CompactionKind.USER, - Set.of(file1, file2, file3, file4), context)); + canReserveCompaction(selTabletWithComp, CompactionKind.USER, Set.of(file4), context, time)); + assertFalse(canReserveCompaction(selTabletWithComp, CompactionKind.USER, + Set.of(file1, file2, file3, file4), context, time)); // test selected files and running compaction var selRunningTablet = TabletMetadata.builder(extent1).putFile(file1, dfv).putFile(file2, dfv) - .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selected) + .putFile(file3, dfv).putFile(file4, dfv).putSelectedFiles(selectedWithComp) .putExternalCompaction(cid2, cm2).build(OPID, USER_COMPACTION_REQUESTED); // should be able to compact files that are in the selected set and not in the running set - assertTrue( - canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file1, file2), context)); + assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file1, file2), + context, time)); // should not be able to compact because files overlap the running set - assertFalse( - canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file2, file3), context)); + assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.USER, Set.of(file2, file3), + context, time)); // should not be able to start a system compaction if the set of files overlaps with the // selected files and/or the running set assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file1, file2), - context)); + context, time)); assertFalse(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file3, file4), - context)); + context, time)); // should be able to start a system compaction on the set of files not in the selected set - assertTrue( - canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file4), context)); + assertTrue(canReserveCompaction(selRunningTablet, CompactionKind.SYSTEM, Set.of(file4), context, + time)); // should not be able to compact a tablet that does not exists - assertFalse(canReserveCompaction(null, CompactionKind.SYSTEM, Set.of(file1, file2), context)); - assertFalse(canReserveCompaction(null, CompactionKind.USER, Set.of(file1, file2), context)); + assertFalse( + canReserveCompaction(null, CompactionKind.SYSTEM, Set.of(file1, file2), context, time)); + assertFalse( + canReserveCompaction(null, CompactionKind.USER, Set.of(file1, file2), context, time)); EasyMock.verify(context); } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java index 8bd5120788..fb09876ab4 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.accumulo.core.client.admin.TabletAvailability; @@ -79,6 +80,7 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.gc.AllVolumesDirectory; @@ -315,7 +317,8 @@ public class MergeTabletsTest { testUnexpectedColumn(tmb -> tmb.putBulkFile(lodedFile.getTabletFile(), otherFateId), "has unexpected loaded column"); - var selectedFiles = new SelectedFiles(Set.of(newSTF(567)), false, otherFateId); + var selectedFiles = new SelectedFiles(Set.of(newSTF(567)), false, otherFateId, + SteadyTime.from(100, TimeUnit.NANOSECONDS)); testUnexpectedColumn(tmb -> tmb.putSelectedFiles(selectedFiles), "has unexpected selected file"); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index a077edf7be..1f6627a7ec 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -52,6 +52,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Supplier; @@ -96,6 +97,7 @@ import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; @@ -490,8 +492,9 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { FateInstanceType type = FateInstanceType.fromTableId(tid); FateId fateId1 = FateId.from(type, UUID.randomUUID()); FateId fateId2 = FateId.from(type, UUID.randomUUID()); + var time = SteadyTime.from(100_100, TimeUnit.NANOSECONDS); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES, SELECTED) - .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId1)) + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId1, time)) .submit(tm -> false); results = ctmi.process(); assertEquals(Status.REJECTED, results.get(e1).getStatus()); @@ -503,7 +506,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { .build(SELECTED); ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, FILES, SELECTED) - .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId1)) + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId1, time)) .submit(tm -> false); results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); @@ -516,10 +519,10 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { // fail var expectedToFail = new ArrayList<SelectedFiles>(); - expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2), true, fateId1)); - expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3, stf4), true, fateId1)); - expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), false, fateId1)); - expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId2)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2), true, fateId1, time)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3, stf4), true, fateId1, time)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), false, fateId1, time)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId2, time)); for (var selectedFiles : expectedToFail) { var tm3 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) @@ -536,7 +539,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) - .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId1)).build(); + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId1, time)).build(); ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES, SELECTED) .deleteSelectedFiles().submit(tm -> false); @@ -565,8 +568,9 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { final boolean initiallySelectedAll = true; final FateInstanceType type = FateInstanceType.fromTableId(tid); final FateId fateId = FateId.from(type, UUID.randomUUID()); + final SteadyTime time = SteadyTime.from(100, TimeUnit.NANOSECONDS); final SelectedFiles selectedFiles = - new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateId); + new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateId, time); ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context); @@ -606,13 +610,15 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { .sorted().collect(Collectors.toList()); // verify we have the format of the json correct - String newJson = createSelectedFilesJson(fateId, initiallySelectedAll, filesPathList); + String newJson = + createSelectedFilesJson(fateId, initiallySelectedAll, filesPathList, 0, time.getNanos()); assertEquals(actualMetadataValue, newJson, "Test json should be identical to actual metadata at this point"); // reverse the order of the files and create a new json Collections.reverse(filesPathList); - newJson = createSelectedFilesJson(fateId, initiallySelectedAll, filesPathList); + newJson = + createSelectedFilesJson(fateId, initiallySelectedAll, filesPathList, 0, time.getNanos()); assertNotEquals(actualMetadataValue, newJson, "Test json should have reverse file order of actual metadata"); @@ -663,10 +669,10 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { * </pre> */ public static String createSelectedFilesJson(FateId fateId, boolean selAll, - Collection<String> paths) { + Collection<String> paths, int compJobs, long selTime) { String filesJsonArray = GSON.get().toJson(paths); - return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'files':" + filesJsonArray + "}") - .replace('\'', '\"'); + return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'compJobs':" + compJobs + + ",'selTime':" + selTime + ",'files':" + filesJsonArray + "}").replace('\'', '\"'); } @Test diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 73c50a0409..bd26f0a469 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -41,6 +41,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutorService; @@ -86,13 +87,18 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlan; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; import org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner; import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; +import org.apache.accumulo.test.compaction.CompactionExecutorIT.TestPlanner; import org.apache.accumulo.test.compaction.ExternalCompactionTestUtils; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; @@ -100,6 +106,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; +import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,6 +114,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Iterators; +import com.google.common.net.HostAndPort; public class CompactionIT extends AccumuloClusterHarness { @@ -218,6 +226,27 @@ public class CompactionIT extends AccumuloClusterHarness { } + public static class TestPlanner implements CompactionPlanner { + private static final Logger LOG = LoggerFactory.getLogger(TestPlanner.class); + + RatioBasedCompactionPlanner p = new RatioBasedCompactionPlanner(); + + @Override + public void init(InitParameters params) { + p.init(params); + } + + @Override + public CompactionPlan makePlan(PlanningParameters params) { + return () -> p.makePlan(params).getJobs().stream().map(job -> { + LOG.debug("Plan job priority is {}:{}", job.getKind(), job.getPriority()); + return new CompactionJobImpl( + job.getKind() == CompactionKind.SYSTEM ? Short.MAX_VALUE : job.getPriority(), + job.getGroup(), job.getFiles(), job.getKind(), Optional.empty()); + }).collect(toList()); + } + } + private static final Logger log = LoggerFactory.getLogger(CompactionIT.class); private static final int MAX_DATA = 1000; @@ -1011,6 +1040,144 @@ public class CompactionIT extends AccumuloClusterHarness { ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), tableName); } + @Test + public void testCancelUserCompactionTimeoutExceeded() throws Exception { + testCancelUserCompactionTimeout(true); + } + + @Test + public void testCancelUserCompactionTimeoutNotExceeded() throws Exception { + testCancelUserCompactionTimeout(false); + } + + private void testCancelUserCompactionTimeout(boolean timeout) throws Exception { + + var uniqueNames = getUniqueNames(2); + String table1 = uniqueNames[0]; + String table2 = uniqueNames[1]; + + try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + // create a compaction service that uses a Planner that will schedule system jobs + // at a higher priority than user jobs + client.instanceOperations().setProperty( + Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner", + TestPlanner.class.getName()); + client.instanceOperations().setProperty( + Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner.opts.groups", + ("[{'group':'" + COMPACTOR_GROUP_1 + "'}]").replaceAll("'", "\"")); + + // create two tables that uses the compaction service + Map<String,String> props = new HashMap<>(); + props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), + SimpleCompactionDispatcher.class.getName()); + props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "testcancel"); + // Disable system compactions to start for these tables + props.put(Property.TABLE_MAJC_RATIO.getKey(), "20"); + + // configure tablet compaction iterator that slows compaction down + var ntc = new NewTableConfiguration(); + IteratorSetting iterSetting = new IteratorSetting(50, SlowIterator.class); + SlowIterator.setSleepTime(iterSetting, 5); + ntc.attachIterator(iterSetting, EnumSet.of(IteratorScope.majc)); + ntc.setProperties(props); + + // Create two tables and write some data + client.tableOperations().create(table1, ntc); + client.tableOperations().create(table2, ntc); + writeRows((ClientContext) client, table1, MAX_DATA, true); + writeRows((ClientContext) client, table2, MAX_DATA, true); + + var ctx = getCluster().getServerContext(); + Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + + // Start a compaction for table2, this is done so that the compactor will be busy + // and new jobs will queue up and wait + client.tableOperations().compact(table2, new CompactionConfig().setWait(false)); + + var tableId = TableId.of(client.tableOperations().tableIdMap().get(table1)); + var extent = new KeyExtent(tableId, null, null); + + // If timeout is true then set a short timeout so the system job can cancel the user job + // Otherwise the long timeout should prevent the system from clearing the selected files + var expiration = timeout ? "100ms" : "100s"; + client.tableOperations().setProperty(table1, + Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey(), expiration); + + // Submit a user job for table1 that will be put on the queue and waiting + // for the current job to finish + client.tableOperations().compact(table1, new CompactionConfig().setWait(false)); + // Wait for the fate operation to write selectedFiles + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var selectedFiles = tabletMeta.getSelectedFiles(); + if (selectedFiles != null) { + return !selectedFiles.getFiles().isEmpty(); + } + return false; + }, Wait.MAX_WAIT_MILLIS, 10); + + // Change the ratio so a system compaction will attempt to be scheduled for table 1 + client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1"); + + if (timeout) { + // Because of the custom planner, the system compaction should now take priority + // System compactions were previously not eligible to run if selectedFiles existed + // for a user compaction already (and they overlapped). But now system compaction jobs + // are eligible to run if the user compaction has not started or completed any jobs + // and the expiration period has been exceeded. + // When this happens the system compaction will delete the selectedFiles column + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + return tabletMeta.getSelectedFiles() == null; + }, Wait.MAX_WAIT_MILLIS, 100); + + // Wait for the system compaction to be running + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + assertTrue(externalCompactions.values().stream() + .allMatch(ec -> ec.getKind() == CompactionKind.SYSTEM)); + return externalCompactions.size() == 1; + }, Wait.MAX_WAIT_MILLIS, 10); + + // Wait for the user compaction to now run after the system finishes + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + var running = externalCompactions.values().stream() + .filter(ec -> ec.getKind() == CompactionKind.USER).count(); + return running == 1; + }, Wait.MAX_WAIT_MILLIS, 100); + } else { + // Wait for the user compaction to run, there should no system compactions scheduled + // even though system has the higher priority in the test because the timeout was + // not exceeded + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + assertTrue(externalCompactions.values().stream() + .allMatch(ec -> ec.getKind() == CompactionKind.USER)); + return externalCompactions.size() == 1; + }, Wait.MAX_WAIT_MILLIS, 10); + } + + // Wait and verify all compactions finish + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions().size(); + log.debug("Waiting for compactions to finish, count {}", externalCompactions); + return externalCompactions == 0 && tabletMeta.getCompacted().isEmpty() + && tabletMeta.getSelectedFiles() == null; + }, Wait.MAX_WAIT_MILLIS, 100); + } + + ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), table1); + } + private void writeRows(ClientContext client, String tableName, int rows, boolean wait) throws Exception { try (BatchWriter bw = client.createBatchWriter(tableName)) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 7af3e0ba00..fc6d42fe35 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; @@ -89,6 +90,7 @@ import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.state.TabletManagementIterator; @@ -589,6 +591,7 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { onlineTables, new LiveTServerSet.LiveTServersSnapshot(tservers, Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)), - Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements); + Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements, + SteadyTime.from(10000, TimeUnit.NANOSECONDS)); } }