This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 4296d2a  Modify announceExistence to use ephemeral node and no lock
4296d2a is described below

commit 4296d2ab70a75ae50420c2f2c7e5a850142fd6cd
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed May 5 13:50:06 2021 +0000

    Modify announceExistence to use ephemeral node and no lock
---
 .../util/compaction/ExternalCompactionUtil.java    | 10 +---
 .../server/compaction/RetryableThriftFunction.java |  3 ++
 .../org/apache/accumulo/compactor/Compactor.java   | 61 +---------------------
 .../apache/accumulo/test/ExternalCompactionIT.java |  2 +-
 4 files changed, 7 insertions(+), 69 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 56114ef..974864b 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -100,14 +100,8 @@ public class ExternalCompactionUtil {
         try {
           List<String> compactors = zooReader.getChildren(compactorQueuesPath 
+ "/" + queue);
           for (String compactor : compactors) {
-            // compactor is the address, we are checking to see if there is a 
child node which
-            // represents the compactor's lock as a check that it's alive.
-            List<String> children =
-                zooReader.getChildren(compactorQueuesPath + "/" + queue + "/" 
+ compactor);
-            if (!children.isEmpty()) {
-              LOG.debug("Found live compactor {} ", compactor);
-              compactAddrs.add(HostAndPort.fromString(compactor));
-            }
+            LOG.debug("Found live compactor: {}", compactor);
+            compactAddrs.add(HostAndPort.fromString(compactor));
           }
         } catch (NoNodeException e) {
           LOG.trace("Ignoring node that went missing", e);
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java
index e87f020..a57904b 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java
@@ -22,5 +22,8 @@ import org.apache.thrift.TException;
 
 @FunctionalInterface
 public interface RetryableThriftFunction<T> {
+  // Note: Do not use the return type Void and return null from the function,
+  // it will retry forever. If your function does not need to return anything,
+  // just use String as the return type and return "".
   T execute() throws TException;
 }
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index ad8a6f4..23bff9f 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -19,7 +19,6 @@
 package org.apache.accumulo.compactor;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -64,19 +63,12 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.fate.zookeeper.ServiceLock;
-import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
-import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerOpts;
@@ -123,7 +115,6 @@ public class Compactor extends AbstractServer
   protected static final CompactionJobHolder JOB_HOLDER = new 
CompactionJobHolder();
 
   private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
-  private final UUID compactorId = UUID.randomUUID();
   private final AccumuloConfiguration aconf;
   private final String queueName;
   private final AtomicReference<CompactionCoordinator.Client> 
coordinatorClient =
@@ -132,7 +123,6 @@ public class Compactor extends AbstractServer
       new AtomicReference<>();
 
   private SecurityOperation security;
-  private ServiceLock compactorLock;
   private ServerAddress compactorAddress = null;
 
   // Exposed for tests
@@ -249,8 +239,7 @@ public class Compactor extends AbstractServer
 
     try {
       zoo.mkdirs(compactorQueuePath);
-      // CBUG may be able to put just an ephemeral node here w/ no lock
-      zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
+      zoo.putEphemeralData(zPath, new byte[] {});
     } catch (KeeperException e) {
       if (e.code() == KeeperException.Code.NOAUTH) {
         LOG.error("Failed to write to ZooKeeper. Ensure that"
@@ -258,44 +247,6 @@ public class Compactor extends AbstractServer
       }
       throw e;
     }
-
-    compactorLock = new 
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
-        ServiceLock.path(zPath), compactorId);
-    LockWatcher lw = new LockWatcher() {
-      @Override
-      public void lostLock(final LockLossReason reason) {
-        Halt.halt(1, () -> {
-          LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
-          gcLogger.logGCInfo(getConfiguration());
-        });
-      }
-
-      @Override
-      public void unableToMonitorLockNode(final Exception e) {
-        Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, 
exiting.", e));
-      }
-    };
-
-    try {
-      byte[] lockContent =
-          new ServerServices(hostPort, 
Service.COMPACTOR_CLIENT).toString().getBytes(UTF_8);
-      for (int i = 0; i < 25; i++) {
-        zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP);
-
-        if (compactorLock.tryLock(lw, lockContent)) {
-          LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath());
-          return;
-        }
-        LOG.info("Waiting for Compactor lock");
-        sleepUninterruptibly(5, TimeUnit.SECONDS);
-      }
-      String msg = "Too many retries, exiting.";
-      LOG.info(msg);
-      throw new RuntimeException(msg);
-    } catch (Exception e) {
-      LOG.info("Could not obtain tablet server lock, exiting.", e);
-      throw new RuntimeException(e);
-    }
   }
 
   /**
@@ -387,9 +338,6 @@ public class Compactor extends AbstractServer
               
coordinatorClient.get().updateCompactionStatus(TraceUtil.traceInfo(),
                   getContext().rpcCreds(), job.getExternalCompactionId(), 
state, message,
                   System.currentTimeMillis());
-              // Note: the return type was changed from Void to String just to 
make this work. When
-              // type was
-              // Void and returned null, it would retry forever.
               return "";
             } finally {
               ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
@@ -798,13 +746,6 @@ public class Compactor extends AbstractServer
 
       gcLogger.logGCInfo(getConfiguration());
       LOG.info("stop requested. exiting ... ");
-      try {
-        if (null != compactorLock) {
-          compactorLock.unlock();
-        }
-      } catch (Exception e) {
-        LOG.warn("Failed to release compactor lock", e);
-      }
     }
 
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 4aa1ae0..4cc8b85 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -823,7 +823,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
         
getCluster().getServerContext().getAmple().getExternalCompactionFinalStates();
     while (fs.count() != 0) {
       LOG.info("Waiting for compaction completed marker to disappear");
-      UtilWaitThread.sleep(100);
+      UtilWaitThread.sleep(1000);
       fs = 
getCluster().getServerContext().getAmple().getExternalCompactionFinalStates();
     }
     try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {

Reply via email to