This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 4296d2a Modify announceExistence to use ephemeral node and no lock 4296d2a is described below commit 4296d2ab70a75ae50420c2f2c7e5a850142fd6cd Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed May 5 13:50:06 2021 +0000 Modify announceExistence to use ephemeral node and no lock --- .../util/compaction/ExternalCompactionUtil.java | 10 +--- .../server/compaction/RetryableThriftFunction.java | 3 ++ .../org/apache/accumulo/compactor/Compactor.java | 61 +--------------------- .../apache/accumulo/test/ExternalCompactionIT.java | 2 +- 4 files changed, 7 insertions(+), 69 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 56114ef..974864b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -100,14 +100,8 @@ public class ExternalCompactionUtil { try { List<String> compactors = zooReader.getChildren(compactorQueuesPath + "/" + queue); for (String compactor : compactors) { - // compactor is the address, we are checking to see if there is a child node which - // represents the compactor's lock as a check that it's alive. - List<String> children = - zooReader.getChildren(compactorQueuesPath + "/" + queue + "/" + compactor); - if (!children.isEmpty()) { - LOG.debug("Found live compactor {} ", compactor); - compactAddrs.add(HostAndPort.fromString(compactor)); - } + LOG.debug("Found live compactor: {}", compactor); + compactAddrs.add(HostAndPort.fromString(compactor)); } } catch (NoNodeException e) { LOG.trace("Ignoring node that went missing", e); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java index e87f020..a57904b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java @@ -22,5 +22,8 @@ import org.apache.thrift.TException; @FunctionalInterface public interface RetryableThriftFunction<T> { + // Note: Do not use the return type Void and return null from the function, + // it will retry forever. If your function does not need to return anything, + // just use String as the return type and return "". T execute() throws TException; } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index ad8a6f4..23bff9f 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -19,7 +19,6 @@ package org.apache.accumulo.compactor; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; import java.net.UnknownHostException; @@ -64,19 +63,12 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.UtilWaitThread; -import org.apache.accumulo.fate.zookeeper.ServiceLock; -import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher; import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerOpts; @@ -123,7 +115,6 @@ public class Compactor extends AbstractServer protected static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); - private final UUID compactorId = UUID.randomUUID(); private final AccumuloConfiguration aconf; private final String queueName; private final AtomicReference<CompactionCoordinator.Client> coordinatorClient = @@ -132,7 +123,6 @@ public class Compactor extends AbstractServer new AtomicReference<>(); private SecurityOperation security; - private ServiceLock compactorLock; private ServerAddress compactorAddress = null; // Exposed for tests @@ -249,8 +239,7 @@ public class Compactor extends AbstractServer try { zoo.mkdirs(compactorQueuePath); - // CBUG may be able to put just an ephemeral node here w/ no lock - zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP); + zoo.putEphemeralData(zPath, new byte[] {}); } catch (KeeperException e) { if (e.code() == KeeperException.Code.NOAUTH) { LOG.error("Failed to write to ZooKeeper. Ensure that" @@ -258,44 +247,6 @@ public class Compactor extends AbstractServer } throw e; } - - compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), - ServiceLock.path(zPath), compactorId); - LockWatcher lw = new LockWatcher() { - @Override - public void lostLock(final LockLossReason reason) { - Halt.halt(1, () -> { - LOG.error("Compactor lost lock (reason = {}), exiting.", reason); - gcLogger.logGCInfo(getConfiguration()); - }); - } - - @Override - public void unableToMonitorLockNode(final Exception e) { - Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); - } - }; - - try { - byte[] lockContent = - new ServerServices(hostPort, Service.COMPACTOR_CLIENT).toString().getBytes(UTF_8); - for (int i = 0; i < 25; i++) { - zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP); - - if (compactorLock.tryLock(lw, lockContent)) { - LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath()); - return; - } - LOG.info("Waiting for Compactor lock"); - sleepUninterruptibly(5, TimeUnit.SECONDS); - } - String msg = "Too many retries, exiting."; - LOG.info(msg); - throw new RuntimeException(msg); - } catch (Exception e) { - LOG.info("Could not obtain tablet server lock, exiting.", e); - throw new RuntimeException(e); - } } /** @@ -387,9 +338,6 @@ public class Compactor extends AbstractServer coordinatorClient.get().updateCompactionStatus(TraceUtil.traceInfo(), getContext().rpcCreds(), job.getExternalCompactionId(), state, message, System.currentTimeMillis()); - // Note: the return type was changed from Void to String just to make this work. When - // type was - // Void and returned null, it would retry forever. return ""; } finally { ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); @@ -798,13 +746,6 @@ public class Compactor extends AbstractServer gcLogger.logGCInfo(getConfiguration()); LOG.info("stop requested. exiting ... "); - try { - if (null != compactorLock) { - compactorLock.unlock(); - } - } catch (Exception e) { - LOG.warn("Failed to release compactor lock", e); - } } } diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index 4aa1ae0..4cc8b85 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -823,7 +823,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { getCluster().getServerContext().getAmple().getExternalCompactionFinalStates(); while (fs.count() != 0) { LOG.info("Waiting for compaction completed marker to disappear"); - UtilWaitThread.sleep(100); + UtilWaitThread.sleep(1000); fs = getCluster().getServerContext().getAmple().getExternalCompactionFinalStates(); } try (final AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {