dlmarion commented on code in PR #6168:
URL: https://github.com/apache/accumulo/pull/6168#discussion_r2883805858
##########
server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java:
##########
@@ -285,7 +285,7 @@ public void runServer() throws Exception {
verificationThread.interrupt();
verificationThread.join();
}
- log.info(getClass().getSimpleName() + " process shut down.");
+ log.info(getClass().getSimpleName() + " process shut down. ");
Review Comment:
Can probably back this change out.
##########
core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java:
##########
@@ -238,6 +242,11 @@ public ServiceLockPath createManagerPath() {
return new ServiceLockPath(Constants.ZMANAGER_LOCK);
}
+ public ServiceLockPath createManagerWorkerPath(ResourceGroupId resourceGroup,
Review Comment:
This will allow Managers to be started in resource groups other than the
default. Is that part of this change?
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -1391,15 +1451,60 @@ private long remaining(long deadline) {
return Math.max(1, deadline - System.currentTimeMillis());
}
+ private void getManagerLock() throws KeeperException, InterruptedException {
+ log.info("trying to get assistant manager lock");
+
+ final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter();
+ try {
+
+ var advertiseAddress = getAdvertiseAddress();
+
+ final ServiceLockPaths.ServiceLockPath zLockPath =
getContext().getServerPaths()
+ .createManagerWorkerPath(getResourceGroup(), advertiseAddress);
+ ServiceLockSupport.createNonHaServiceLockPath(Type.MANAGER, zoo,
zLockPath);
+
+ var serverLockUUID = UUID.randomUUID();
+ managerLock = new ServiceLock(getContext().getZooSession(), zLockPath,
serverLockUUID);
+ ServiceLock.LockWatcher lw = new
ServiceLockSupport.ServiceLockWatcher(Type.MANAGER,
+ () -> getShutdownComplete().get(),
+ (type) ->
getContext().getLowMemoryDetector().logGCInfo(getContext().getConfiguration()));
+
+ for (int i = 0; i < 120 / 5; i++) {
+ zoo.putPersistentData(zLockPath.toString(), new byte[0],
ZooUtil.NodeExistsPolicy.SKIP);
+
+ ServiceLockData.ServiceDescriptors descriptors = new
ServiceLockData.ServiceDescriptors();
+ for (ServiceLockData.ThriftService svc : new
ServiceLockData.ThriftService[] {
+ ThriftService.MANAGER}) {
Review Comment:
I wonder if this should be advertising the Fate service, not the Manager
service.
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -958,6 +975,26 @@ public void run() {
throw new IllegalStateException("Unable to start server on host " +
getBindAddress(), e);
}
+ tserverSet.startListeningForTabletServerChanges(this);
+
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+
+ try {
+ // Acquire the lock that all managers get before the primary lock, this
allows non primary
+ // manager processes to work on stuff.
+ getManagerLock();
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Unable to get manager lock ", e);
+ }
+
+ fateWorker.setLock(managerLock);
+
+ metricsInfo
+ .addMetricsProducers(fateWorker.getMetricsProducers().toArray(new
MetricsProducer[0]));
+
+ metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(),
getApplicationName(),
Review Comment:
Something to address later, how can we add a tag to the metrics to denote
whether this Manager is primary or not.
##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/FateWorkerThriftClient.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.rpc.clients;
+
+import org.apache.accumulo.core.manager.thrift.FateWorkerService;
+
+/**
+ * Client side object that can be used to interact with operatoins that are
supported by any manager
Review Comment:
```suggestion
* Client side object that can be used to interact with fate operations,
which are supported by any manager
```
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
Review Comment:
If we add a port search property for the Manager, instead of passing null,
then we can run multiple managers on the same host. This should be useful for
ITs.
##########
test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.util.stream.Collectors.toSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.ServerOpts;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FatePartition;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.TraceRepo;
+import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.lock.ServiceLockPaths;
+import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
+import org.apache.accumulo.core.metadata.SystemTables;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.FateEnv;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.fate.FastFate;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.common.net.HostAndPort;
+
+/**
+ * {@link ComprehensiveMultiManagerIT} runs multiple managers with lots of
Accumulo APIs, however
+ * that does not actually verify that fate operations actually run on multiple
managers. This test
+ * runs a smaller set of Accumulo API operations and does the following.
+ *
+ * <ul>
+ * <li>Starts new manager processes and verifies fate operations start running
on them</li>
+ * <li>Kills assistant/non-primary manager processes and verifies the system
recovers</li>
+ * <li>Kills primary manager process and verifies the system recovers</li>
+ * <li>Verifies that Accumulo API calls are not impacted by managers
starting/stopping</li>
+ * </ul>
+ *
+ */
+public class MultipleManagerIT extends ConfigurableMacBase {
+
+ // A manager that will quickly clean up fate reservations held by dead
managers
+ public static class FastFateCleanupManager extends Manager {
+ protected FastFateCleanupManager(ServerOpts opts, String[] args) throws
IOException {
+ super(opts, ServerContext::new, args);
+ }
+
+ @Override
+ protected Fate<FateEnv> createFateInstance(FateEnv env, FateStore<FateEnv>
store,
+ ServerContext context) {
+ LoggerFactory.getLogger(FastFateCleanupManager.class)
+ .info("Creating Fast fate cleanup manager for {}", store.type());
+ return new FastFate<>(env, store, true, TraceRepo::toLogString,
getConfiguration());
+ }
+
+ public static void main(String[] args) throws Exception {
+ try (FastFateCleanupManager manager = new FastFateCleanupManager(new
ServerOpts(), args)) {
+ manager.runServer();
+ }
+ }
+ }
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ // FOLLOW_ON add a way to start multiple managers to mini
+ cfg.getClusterServerConfiguration().setNumDefaultCompactors(8);
+ // Set this lower so that locks timeout faster
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ cfg.setServerClass(ServerType.MANAGER, r -> FastFateCleanupManager.class);
+ super.configure(cfg, hadoopCoreSite);
+ }
+
+ @Test
+ public void testFate() throws Exception {
+
+ List<Process> managerWorkers = new ArrayList<>();
+ var executor = Executors.newCachedThreadPool();
+
+ // Start a lot of background threads that should cause fate operations to
run.
+ try (var client =
Accumulo.newClient().from(getClientProperties()).build()) {
+ // Create a table in order to wait for the single manager to become the
primary manager
+ client.tableOperations().create("waitTable");
+
+ // start more manager processes, should be assigned fate work
+ managerWorkers.add(exec(FastFateCleanupManager.class));
Review Comment:
Are these Managers actually running without the port search property?
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -958,6 +975,26 @@ public void run() {
throw new IllegalStateException("Unable to start server on host " +
getBindAddress(), e);
}
+ tserverSet.startListeningForTabletServerChanges(this);
+
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+
+ try {
+ // Acquire the lock that all managers get before the primary lock, this
allows non primary
+ // manager processes to work on stuff.
+ getManagerLock();
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Unable to get manager lock ", e);
+ }
+
+ fateWorker.setLock(managerLock);
+
+ metricsInfo
+ .addMetricsProducers(fateWorker.getMetricsProducers().toArray(new
MetricsProducer[0]));
+
+ metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(),
getApplicationName(),
+ getAdvertiseAddress(), getResourceGroup()));
+
Review Comment:
Do we want to wait here for some minimum set of Managers before proceeding
like we do for TabletServers? Wondering if it might reduce some churn in the
FateManager at startup. I have the code for this already in #3262 in Manager at
line 1057.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]