This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 7551e7c  Add missing class, addd Property and set it to lower value in 
IT to run faster
7551e7c is described below

commit 7551e7c09fff294eabe0ed3b085dd6df26a543db
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon May 10 14:17:52 2021 +0000

    Add missing class, addd Property and set it to lower value in IT to run 
faster
---
 .../org/apache/accumulo/core/conf/Property.java    |  3 ++
 .../util/compaction/CompactionExecutorIdImpl.java  | 51 ++++++++++++++++++++++
 .../coordinator/CompactionCoordinator.java         | 13 +++---
 .../coordinator/CompactionCoordinatorTest.java     |  3 ++
 .../apache/accumulo/test/ExternalCompactionIT.java |  1 +
 5 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 130e10c..5dba145 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1062,6 +1062,9 @@ public enum Property {
   
COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL("coordinator.server.finalizer.check.interval",
       "60s", PropertyType.TIMEDURATION,
       "The interval at which to check for external compaction final state 
markers in the metadata table."),
+  COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL(
+      "coordinator.server.tserver.compaction.check.interval", "1m", 
PropertyType.TIMEDURATION,
+      "The interval at which to check the tservers for external compactions."),
   // deprecated properties grouped at the end to reference property that 
replaces them
   @Deprecated(since = "1.6.0")
   @ReplacedBy(property = INSTANCE_VOLUMES)
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionExecutorIdImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionExecutorIdImpl.java
new file mode 100644
index 0000000..fa205b4
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionExecutorIdImpl.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.compaction;
+
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+
+import com.google.common.base.Preconditions;
+
+public class CompactionExecutorIdImpl extends CompactionExecutorId {
+
+  protected CompactionExecutorIdImpl(String canonical) {
+    super(canonical);
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  public boolean isExernalId() {
+    return canonical().startsWith("e.");
+  }
+
+  public String getExernalName() {
+    Preconditions.checkState(isExernalId());
+    return canonical().substring("e.".length());
+  }
+
+  public static CompactionExecutorId internalId(CompactionServiceId csid, 
String executorName) {
+    return new CompactionExecutorIdImpl("i." + csid + "." + executorName);
+  }
+
+  public static CompactionExecutorId externalId(String executorName) {
+    return new CompactionExecutorIdImpl("e." + executorName);
+  }
+
+}
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b7dffb1..309c70a 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -88,8 +88,7 @@ public class CompactionCoordinator extends AbstractServer
     LiveTServerSet.Listener {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
-  private static final long TIME_BETWEEN_CHECKS = 5000;
-  public static final long TSERVER_CHECK_INTERVAL = 60000;
+  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   private static final long FIFTEEN_MINUTES =
       TimeUnit.MILLISECONDS.convert(Duration.of(15, 
TimeUnit.MINUTES.toChronoUnit()));
 
@@ -155,7 +154,7 @@ public class CompactionCoordinator extends AbstractServer
 
   protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
     schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
-        TIME_BETWEEN_CHECKS, TimeUnit.MILLISECONDS);
+        TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
   }
 
   private void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
@@ -365,7 +364,7 @@ public class CompactionCoordinator extends AbstractServer
     }
 
     tserverSet.startListeningForTabletServerChanges();
-    new DeadCompactionDetector(getContext(), compactionFinalizer, 
schedExecutor).start();
+    startDeadCompactionDetector();
 
     LOG.info("Starting loop to check tservers for compaction summaries");
     while (!shutdown) {
@@ -416,12 +415,16 @@ public class CompactionCoordinator extends AbstractServer
     LOG.info("Shutting down");
   }
 
+  protected void startDeadCompactionDetector() {
+    new DeadCompactionDetector(getContext(), compactionFinalizer, 
schedExecutor).start();
+  }
+
   protected long getMissingCompactorWarningTime() {
     return FIFTEEN_MINUTES;
   }
 
   protected long getTServerCheckInterval() {
-    return TSERVER_CHECK_INTERVAL;
+    return 
this.aconf.getTimeInMillis(Property.COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
   }
 
   protected TabletMetadata getMetadataEntryForExtent(KeyExtent extent) {
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 172bed2..3dd3ecd 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -101,6 +101,9 @@ public class CompactionCoordinatorTest {
     }
 
     @Override
+    protected void startDeadCompactionDetector() {}
+
+    @Override
     protected long getTServerCheckInterval() {
       this.shutdown = true;
       return 0L;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 8ae648d..141b6cc 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -145,6 +145,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
     
cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors",
         "[{'name':'all','externalQueue':'DCQ2'}]");
     
cfg.setProperty(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL.getKey(), 
"30s");
+    cfg.setProperty(Property.COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, 
"10s");
     // use raw local file system so walogs sync and flush will work
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }

Reply via email to