This is an automated email from the ASF dual-hosted git repository. brianloss pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 74a0d5e Improve SuspendedTabletsIT reliability (#2123) 74a0d5e is described below commit 74a0d5e9276946c5266e5cc452370660871ace5f Author: Brian Loss <brianl...@apache.org> AuthorDate: Fri May 28 13:59:35 2021 -0400 Improve SuspendedTabletsIT reliability (#2123) The previous attempt to fix SuspendedTabletsIT attempted to use HostRegexTableLoadBalancer as a per-table balancer. Unfortunately, HostRegexTableLoadBalancer was not designed to be used that way but instead was meant to replace TableLoadBalancer and use regular expressions to divide the hosts into pools. Therefore, the balancer was failing to initialize and wasn't actually doing any work. * Performed minor refactoring on HostRegexTableLoadBalancer so that it can be extended to include the port when performing regex checks. * Extend HostRegexTableLoadBalancer and configure it to use the port as well as host name for regex checks so that a single tserver on a host with multiple tservers running can be isolated and used for a table. In particular, this is used to send all metadata table tablets to a single tserver running in MiniAccumuloCluster. * Start MiniAccumuloCluster with a single tserver so that we can use that information to configure the extended HostRegexTableLoadBalancer to send all metadata tablets to that first tablet server. With one tablet server running, we can also grab the process reference to it for when we later kill tablet servers (to ensure we don't kill the tablet server hosting the metadata table). Once this information is collected, then MiniAccumuloCluster is re-configured for 3 tablet servers and the additional tablet servers are launched. * Fix the "Waiting on hosting and balance" conditions to wait for BOTH all tablets to be hosted and for those tablets to be hosted across all tablet servers (except the one reserved for the metadata table) * Fix the "Waiting on suspended tablets" condition which was incorrectly using a conjunction when a disjunction was needed. The result was the loop could exit without any tablets actually reaching the suspended state. * Refactor TabletLocations so we can choose to scan either the root or metadata table. This allows us to retrieve tablet locations for the metadata table. Fixes #2121 --- .../spi/balancer/HostRegexTableLoadBalancer.java | 9 +- .../accumulo/test/manager/SuspendedTabletsIT.java | 148 ++++++++++++++++++--- 2 files changed, 133 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java index 46157cb..c21f8fc 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java @@ -200,7 +200,7 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer { LOG.debug("Performing pool recheck - regrouping tablet servers based on regular expressions"); Map<String,SortedMap<TabletServerId,TServerStatus>> newPools = new HashMap<>(); for (Entry<TabletServerId,TServerStatus> e : current.entrySet()) { - List<String> poolNames = getPoolNamesForHost(e.getKey().getHost()); + List<String> poolNames = getPoolNamesForHost(e.getKey()); for (String pool : poolNames) { SortedMap<TabletServerId,TServerStatus> np = newPools.get(pool); if (np == null) { @@ -232,11 +232,12 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer { /** * Matches host against the regexes and returns the matching pool names * - * @param host + * @param tabletServerId * tablet server host * @return pool names, will return default pool if host matches more no regex */ - protected List<String> getPoolNamesForHost(String host) { + protected List<String> getPoolNamesForHost(TabletServerId tabletServerId) { + final String host = tabletServerId.getHost(); String test = host; if (!hrtlbConf.get().isIpBasedRegex) { try { @@ -401,7 +402,7 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer { for (Entry<TabletServerId,TServerStatus> e : current.entrySet()) { // pool names are the same as table names, except in the DEFAULT case. // If this table is assigned to a pool for this host, then move on. - List<String> hostPools = getPoolNamesForHost(e.getKey().getHost()); + List<String> hostPools = getPoolNamesForHost(e.getKey()); if (hostPools.contains(tablePoolName)) { continue; } diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java index 770648e..8918702 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java @@ -21,10 +21,13 @@ package org.apache.accumulo.test.manager; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.net.UnknownHostException; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -40,9 +43,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ManagerClient; @@ -52,9 +57,11 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletLocationState; import org.apache.accumulo.core.spi.balancer.HostRegexTableLoadBalancer; +import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -64,6 +71,7 @@ import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -81,6 +89,8 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { public static final long SUSPEND_DURATION = 80; public static final int TABLETS = 30; + private ProcessReference metadataTserverProcess; + @Override protected int defaultTimeoutSeconds() { return 5 * 60; @@ -91,12 +101,54 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s"); cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s"); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - cfg.setNumTservers(TSERVERS); + // Start with 1 tserver, we'll increase that later + cfg.setNumTservers(1); // config custom balancer to keep all metadata on one server - cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + MetadataTable.NAME, "*"); - cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, "7s"); - cfg.setProperty(Property.TABLE_LOAD_BALANCER.getKey(), - HostRegexTableLoadBalancer.class.getName()); + cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, "1ms"); + cfg.setProperty(Property.MANAGER_TABLET_BALANCER.getKey(), + HostAndPortRegexTableLoadBalancer.class.getName()); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + // Wait for all tablet servers to come online and then choose the first server in the list. + // Update the balancer configuration to assign all metadata tablets to that server (and + // everything else to other servers). + InstanceOperations iops = client.instanceOperations(); + List<String> tservers = iops.getTabletServers(); + while (tservers == null || tservers.size() < 1) { + Thread.sleep(1000L); + tservers = client.instanceOperations().getTabletServers(); + } + HostAndPort metadataServer = HostAndPort.fromString(tservers.get(0)); + log.info("Configuring balancer to assign all metadata tablets to {}", metadataServer); + iops.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + MetadataTable.NAME, + metadataServer.toString()); + + // Wait for the balancer to assign all metadata tablets to the chosen server. + ClientContext ctx = (ClientContext) client; + TabletLocations tl = TabletLocations.retrieve(ctx, MetadataTable.NAME, RootTable.NAME); + while (tl.hosted.keySet().size() != 1 || !tl.hosted.containsKey(metadataServer)) { + log.info("Metadata tablets are not hosted on the correct server. Waiting for balancer..."); + Thread.sleep(1000L); + tl = TabletLocations.retrieve(ctx, MetadataTable.NAME, RootTable.NAME); + } + log.info("Metadata tablets are now hosted on {}", metadataServer); + } + + // Since we started only a single tablet server, we know it's the one hosting the + // metadata table. Save its process reference off so we can exclude it later when + // killing tablet servers. + Collection<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER); + assertEquals("Expected a single tserver process", 1, procs.size()); + metadataTserverProcess = procs.iterator().next(); + + // Update the number of tservers and start the new tservers. + getCluster().getConfig().setNumTservers(TSERVERS); + getCluster().start(); } @Test @@ -104,9 +156,14 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { // Run the test body. When we get to the point where we need a tserver to go away, get rid of it // via crashing suspensionTestBody((ctx, locs, count) -> { - List<ProcessReference> procs = - new ArrayList<>(getCluster().getProcesses().get(ServerType.TABLET_SERVER)); - Collections.shuffle(procs); + // Exclude the tablet server hosting the metadata table from the list and only + // kill tablet servers that are not hosting the metadata table. + List<ProcessReference> procs = getCluster().getProcesses().get(ServerType.TABLET_SERVER) + .stream().filter(p -> !metadataTserverProcess.equals(p)).collect(Collectors.toList()); + Collections.shuffle(procs, RANDOM); + assertEquals("Not enough tservers exist", TSERVERS - 1, procs.size()); + assertTrue("Attempting to kill more tservers (" + count + ") than exist in the cluster (" + + procs.size() + ")", procs.size() >= count); for (int i = 0; i < count; ++i) { ProcessReference pr = procs.get(i); @@ -140,9 +197,11 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { } // remove servers with metadata on them from the list of servers to be shutdown + assertEquals("Expecting a single tServer in metadataServerSet", 1, metadataServerSet.size()); tserverSet.removeAll(metadataServerSet); - assertEquals("Expecting two tServers in shutdown-list", 2, tserverSet.size()); + assertEquals("Expecting " + (TSERVERS - 1) + " tServers in shutdown-list", TSERVERS - 1, + tserverSet.size()); List<TServerInstance> tserversList = new ArrayList<>(tserverSet); Collections.shuffle(tserversList, RANDOM); @@ -210,28 +269,30 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { // ... and balanced. ctx.instanceOperations().waitForBalance(); do { - // Give at least another 15 seconds for migrations to finish up - Thread.sleep(15000); + // Keep checking until all tablets are hosted and spread out across the tablet servers + Thread.sleep(1000); ds = TabletLocations.retrieve(ctx, tableName); - } while (ds.hostedCount != TABLETS); + } while (ds.hostedCount != TABLETS || ds.hosted.keySet().size() != (TSERVERS - 1)); - // Pray all of our tservers have at least 1 tablet. - assertEquals(TSERVERS, ds.hosted.keySet().size()); + // Given the loop exit condition above, at this point we're sure that all tablets are hosted + // and some are hosted on each of the tablet servers other than the one reserved for hosting + // the metadata table. + assertEquals(TSERVERS - 1, ds.hosted.keySet().size()); // Kill two tablet servers hosting our tablets. This should put tablets into suspended state, // and thus halt balancing. TabletLocations beforeDeathState = ds; log.info("Eliminating tablet servers"); - serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2); + serverStopper.eliminateTabletServers(ctx, beforeDeathState, TSERVERS - 1); // All tablets should be either hosted or suspended. log.info("Waiting on suspended tablets"); do { Thread.sleep(1000); ds = TabletLocations.retrieve(ctx, tableName); - } while (ds.suspended.keySet().size() != 2 - && (ds.suspendedCount + ds.hostedCount) != TABLETS); + } while (ds.suspended.keySet().size() != (TSERVERS - 1) + || (ds.suspendedCount + ds.hostedCount) != TABLETS); SetMultimap<HostAndPort,KeyExtent> deadTabletsByServer = ds.suspended; @@ -285,6 +346,48 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { THREAD_POOL.shutdownNow(); } + /** + * A version of {@link HostRegexTableLoadBalancer} that includes the tablet server port in + * addition to the host name when checking regular expressions. This is useful for testing when + * multiple tablet servers are running on the same host and one wishes to make pools from the + * tablet servers on that host. + */ + public static class HostAndPortRegexTableLoadBalancer extends HostRegexTableLoadBalancer { + private static final Logger LOG = + LoggerFactory.getLogger(HostAndPortRegexTableLoadBalancer.class.getName()); + + @Override + protected List<String> getPoolNamesForHost(TabletServerId tabletServerId) { + final String host = tabletServerId.getHost(); + String test = host; + if (!isIpBasedRegex()) { + try { + test = getNameFromIp(host); + } catch (UnknownHostException e1) { + LOG.error("Unable to determine host name for IP: " + host + ", setting to default pool", + e1); + return Collections.singletonList(DEFAULT_POOL); + } + } + + // Add the port on the end + final String hostString = test + ":" + tabletServerId.getPort(); + List<String> pools = getPoolNameToRegexPattern().entrySet().stream() + .filter(e -> e.getValue().matcher(hostString).matches()).map(Map.Entry::getKey) + .collect(Collectors.toList()); + if (pools.isEmpty()) { + pools.add(DEFAULT_POOL); + } + return pools; + } + + @Override + public long balance(BalanceParameters params) { + super.balance(params); + return 1000L; // Balance once per second during the test + } + } + private static class TabletLocations { public final Map<KeyExtent,TabletLocationState> locationStates = new HashMap<>(); public final SetMultimap<HostAndPort,KeyExtent> hosted = HashMultimap.create(); @@ -295,6 +398,11 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { public static TabletLocations retrieve(final ClientContext ctx, final String tableName) throws Exception { + return retrieve(ctx, tableName, MetadataTable.NAME); + } + + public static TabletLocations retrieve(final ClientContext ctx, final String tableName, + final String metaName) throws Exception { int sleepTime = 200; int remainingAttempts = 30; @@ -302,7 +410,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { try { FutureTask<TabletLocations> tlsFuture = new FutureTask<>(() -> { TabletLocations answer = new TabletLocations(); - answer.scan(ctx, tableName); + answer.scan(ctx, tableName, metaName); return answer; }); THREAD_POOL.submit(tlsFuture); @@ -321,10 +429,10 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { } } - private void scan(ClientContext ctx, String tableName) { + private void scan(ClientContext ctx, String tableName, String metaName) { Map<String,String> idMap = ctx.tableOperations().tableIdMap(); String tableId = Objects.requireNonNull(idMap.get(tableName)); - try (var scanner = new MetaDataTableScanner(ctx, new Range(), MetadataTable.NAME)) { + try (var scanner = new MetaDataTableScanner(ctx, new Range(), metaName)) { while (scanner.hasNext()) { TabletLocationState tls = scanner.next();