This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new ff67c09 Split compact method into two more specific methods (#2263)
ff67c09 is described below
commit ff67c0948507db573a313da5a71f690a6a92fff9
Author: Mike Miller <[email protected]>
AuthorDate: Fri Sep 10 06:42:24 2021 -0400
Split compact method into two more specific methods (#2263)
* Split compact method in CompactionService into to 2 methods:
getCompactionPlan() and submitCompactionJob()
* Rename compact() to submitCompaction()
* Add javadoc comments to various methods
* Add final to various private members
* Drop redundant enum check and boolean assignment
Co-authored-by: Keith Turner <[email protected]>
---
.../client/admin/compaction/CompactableFile.java | 2 ++
.../core/spi/compaction/CompactionPlan.java | 3 ++
.../core/util/compaction/CompactionPlanImpl.java | 8 ++---
.../tserver/compactions/CompactionManager.java | 11 ++++---
.../tserver/compactions/CompactionService.java | 36 ++++++++++++++--------
.../accumulo/tserver/tablet/CompactableImpl.java | 7 ++---
6 files changed, 42 insertions(+), 25 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactableFile.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactableFile.java
index 3e0a21e..8be4985 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactableFile.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactableFile.java
@@ -23,6 +23,8 @@ import java.net.URI;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
/**
+ * A single file ready to compact, that will come in a set of possible
candidates.
+ *
* @since 2.1.0
*/
public interface CompactableFile {
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionPlan.java
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionPlan.java
index 592f5b4..410dccd 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionPlan.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionPlan.java
@@ -55,5 +55,8 @@ public interface CompactionPlan {
CompactionPlan build();
}
+ /**
+ * Return the set of jobs this plan will submit for compaction.
+ */
Collection<CompactionJob> getJobs();
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java
index 111c386..e204cb8 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java
@@ -54,11 +54,11 @@ public class CompactionPlanImpl implements CompactionPlan {
public static class BuilderImpl implements CompactionPlan.Builder {
- private CompactionKind kind;
+ private final CompactionKind kind;
private ArrayList<CompactionJob> jobs = new ArrayList<>();
- private Set<CompactableFile> allFiles;
- private Set<CompactableFile> seenFiles = new HashSet<>();
- private Set<CompactableFile> candidates;
+ private final Set<CompactableFile> allFiles;
+ private final Set<CompactableFile> seenFiles = new HashSet<>();
+ private final Set<CompactableFile> candidates;
public BuilderImpl(CompactionKind kind, Set<CompactableFile> allFiles,
Set<CompactableFile> candidates) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index 5e65f7a..f7c5b2a 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -254,7 +254,7 @@ public class CompactionManager {
new HashSet<>(runningExternalCompactions.keySet());
for (Compactable compactable : compactables) {
last = compactable;
- compact(compactable);
+ submitCompaction(compactable);
// remove anything from snapshot that tablets know are running
compactable.getExternalCompactionIds(runningEcids::remove);
}
@@ -267,7 +267,7 @@ public class CompactionManager {
compactablesToCheck.poll(maxTimeBetweenChecks - passed,
TimeUnit.MILLISECONDS);
if (compactable != null) {
last = compactable;
- compact(compactable);
+ submitCompaction(compactable);
}
}
@@ -290,7 +290,10 @@ public class CompactionManager {
}
}
- private void compact(Compactable compactable) {
+ /**
+ * Get each configured service for the compactable tablet and submit for
compaction
+ */
+ private void submitCompaction(Compactable compactable) {
for (CompactionKind ctype : CompactionKind.values()) {
var csid = compactable.getConfiguredService(ctype);
var service = services.get(csid);
@@ -308,7 +311,7 @@ public class CompactionManager {
}
if (service != null) {
- service.compact(ctype, compactable, compactablesToCheck::add);
+ service.submitCompaction(ctype, compactable, compactablesToCheck::add);
}
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index ef19cbf..43f1363 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -223,15 +224,27 @@ public class CompactionService {
return true;
}
- public void compact(CompactionKind kind, Compactable compactable,
+ /**
+ * Get compaction plan for the provided compactable tablet and possibly
submit for compaction.
+ * Plans get added to the planning queue before calling the planningExecutor
to get the plan. If
+ * no files are selected, return. Otherwise, submit the compaction job.
+ */
+ public void submitCompaction(CompactionKind kind, Compactable compactable,
Consumer<Compactable> completionCallback) {
Objects.requireNonNull(compactable);
+ // add tablet to planning queue and use planningExecutor to get the plan
if (queuedForPlanning.get(kind).putIfAbsent(compactable.getExtent(),
compactable) == null) {
try {
planningExecutor.execute(() -> {
try {
- planCompaction(kind, compactable, completionCallback);
+ Optional<Compactable.Files> files = compactable.getFiles(myId,
kind);
+ if (files.isEmpty() || files.get().candidates.isEmpty()) {
+ log.trace("Compactable returned no files {} {}",
compactable.getExtent(), kind);
+ } else {
+ CompactionPlan plan = getCompactionPlan(kind, files.get(),
compactable);
+ submitCompactionJob(plan, files.get(), compactable,
completionCallback);
+ }
} finally {
queuedForPlanning.get(kind).remove(compactable.getExtent());
}
@@ -305,16 +318,9 @@ public class CompactionService {
}
}
- private void planCompaction(CompactionKind kind, Compactable compactable,
- Consumer<Compactable> completionCallback) {
- var files = compactable.getFiles(myId, kind);
-
- if (files.isEmpty() || files.get().candidates.isEmpty()) {
- log.trace("Compactable returned no files {} {} {}",
compactable.getExtent(), kind, files);
- return;
- }
-
- PlanningParameters params = new CpPlanParams(kind, compactable,
files.get());
+ private CompactionPlan getCompactionPlan(CompactionKind kind,
Compactable.Files files,
+ Compactable compactable) {
+ PlanningParameters params = new CpPlanParams(kind, compactable, files);
log.trace("Planning compactions {} {} {} {}", planner.getClass().getName(),
compactable.getExtent(), kind, files);
@@ -328,7 +334,11 @@ public class CompactionService {
throw e;
}
- plan = convertPlan(plan, kind, files.get().allFiles,
files.get().candidates);
+ return convertPlan(plan, kind, files.allFiles, files.candidates);
+ }
+
+ private void submitCompactionJob(CompactionPlan plan, Compactable.Files
files,
+ Compactable compactable, Consumer<Compactable> completionCallback) {
// log error if tablet is metadata and compaction is external
var execIds = plan.getJobs().stream().map(cj -> (CompactionExecutorIdImpl)
cj.getExecutor());
if (compactable.getExtent().isMeta() && execIds.anyMatch(ceid ->
ceid.isExternalId())) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 5ab8fbf..e64d53a 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -680,7 +680,7 @@ public class CompactableImpl implements Compactable {
private void checkifChopComplete(Set<StoredTabletFile> allFiles) {
- boolean completed = false;
+ boolean completed;
synchronized (this) {
completed = fileMgr.finishChop(allFiles);
@@ -847,7 +847,7 @@ public class CompactableImpl implements Compactable {
}
Pair<Long,CompactionConfig> idAndCfg = null;
- if (extKind != null && extKind == CompactionKind.USER) {
+ if (extKind == CompactionKind.USER) {
try {
idAndCfg = tablet.getCompactionID();
if (!idAndCfg.getFirst().equals(cid)) {
@@ -870,7 +870,6 @@ public class CompactableImpl implements Compactable {
}
if (extKind != null) {
-
if (extKind == CompactionKind.USER) {
this.chelper = CompactableUtils.getHelper(extKind, tablet, cid,
idAndCfg.getSecond());
this.compactionConfig = idAndCfg.getSecond();
@@ -1012,7 +1011,7 @@ public class CompactableImpl implements Compactable {
}
class CompactionCheck {
- private Supplier<Boolean> memoizedCheck;
+ private final Supplier<Boolean> memoizedCheck;
public CompactionCheck(CompactionServiceId service, CompactionKind kind,
Long compactionId) {
this.memoizedCheck = Suppliers.memoizeWithExpiration(() -> {