This is an automated email from the ASF dual-hosted git repository.

brianloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 74a0d5e  Improve SuspendedTabletsIT reliability (#2123)
74a0d5e is described below

commit 74a0d5e9276946c5266e5cc452370660871ace5f
Author: Brian Loss <brianl...@apache.org>
AuthorDate: Fri May 28 13:59:35 2021 -0400

    Improve SuspendedTabletsIT reliability (#2123)
    
    The previous attempt to fix SuspendedTabletsIT attempted to use
    HostRegexTableLoadBalancer as a per-table balancer. Unfortunately,
    HostRegexTableLoadBalancer was not designed to be used that way but
    instead was meant to replace TableLoadBalancer and use regular
    expressions to divide the hosts into pools. Therefore, the balancer was
    failing to initialize and wasn't actually doing any work.
    
    * Performed minor refactoring on HostRegexTableLoadBalancer so that it
      can be extended to include the port when performing regex checks.
    * Extend HostRegexTableLoadBalancer and configure it to use the port as
      well as host name for regex checks so that a single tserver on a host
      with multiple tservers running can be isolated and used for a table.
      In particular, this is used to send all metadata table tablets to a
      single tserver running in MiniAccumuloCluster.
    * Start MiniAccumuloCluster with a single tserver so that we can use
      that information to configure the extended HostRegexTableLoadBalancer
      to send all metadata tablets to that first tablet server. With one
      tablet server running, we can also grab the process reference to it
      for when we later kill tablet servers (to ensure we don't kill the
      tablet server hosting the metadata table). Once this information is
      collected, then MiniAccumuloCluster is re-configured for 3 tablet
      servers and the additional tablet servers are launched.
    * Fix the "Waiting on hosting and balance" conditions to wait for BOTH
      all tablets to be hosted and for those tablets to be hosted across all
      tablet servers (except the one reserved for the metadata table)
    * Fix the "Waiting on suspended tablets" condition which was incorrectly
      using a conjunction when a disjunction was needed. The result was the
      loop could exit without any tablets actually reaching the suspended
      state.
    * Refactor TabletLocations so we can choose to scan either the root or
      metadata table. This allows us to retrieve tablet locations for the
      metadata table.
    
    Fixes #2121
---
 .../spi/balancer/HostRegexTableLoadBalancer.java   |   9 +-
 .../accumulo/test/manager/SuspendedTabletsIT.java  | 148 ++++++++++++++++++---
 2 files changed, 133 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
index 46157cb..c21f8fc 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
@@ -200,7 +200,7 @@ public class HostRegexTableLoadBalancer extends 
TableLoadBalancer {
     LOG.debug("Performing pool recheck - regrouping tablet servers based on 
regular expressions");
     Map<String,SortedMap<TabletServerId,TServerStatus>> newPools = new 
HashMap<>();
     for (Entry<TabletServerId,TServerStatus> e : current.entrySet()) {
-      List<String> poolNames = getPoolNamesForHost(e.getKey().getHost());
+      List<String> poolNames = getPoolNamesForHost(e.getKey());
       for (String pool : poolNames) {
         SortedMap<TabletServerId,TServerStatus> np = newPools.get(pool);
         if (np == null) {
@@ -232,11 +232,12 @@ public class HostRegexTableLoadBalancer extends 
TableLoadBalancer {
   /**
    * Matches host against the regexes and returns the matching pool names
    *
-   * @param host
+   * @param tabletServerId
    *          tablet server host
    * @return pool names, will return default pool if host matches more no regex
    */
-  protected List<String> getPoolNamesForHost(String host) {
+  protected List<String> getPoolNamesForHost(TabletServerId tabletServerId) {
+    final String host = tabletServerId.getHost();
     String test = host;
     if (!hrtlbConf.get().isIpBasedRegex) {
       try {
@@ -401,7 +402,7 @@ public class HostRegexTableLoadBalancer extends 
TableLoadBalancer {
           for (Entry<TabletServerId,TServerStatus> e : current.entrySet()) {
             // pool names are the same as table names, except in the DEFAULT 
case.
             // If this table is assigned to a pool for this host, then move on.
-            List<String> hostPools = getPoolNamesForHost(e.getKey().getHost());
+            List<String> hostPools = getPoolNamesForHost(e.getKey());
             if (hostPools.contains(tablePoolName)) {
               continue;
             }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java 
b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
index 770648e..8918702 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
@@ -21,10 +21,13 @@ package org.apache.accumulo.test.manager;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.net.UnknownHostException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,9 +43,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.ManagerClient;
@@ -52,9 +57,11 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletLocationState;
 import org.apache.accumulo.core.spi.balancer.HostRegexTableLoadBalancer;
+import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -64,6 +71,7 @@ import 
org.apache.accumulo.test.functional.ConfigurableMacBase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -81,6 +89,8 @@ public class SuspendedTabletsIT extends ConfigurableMacBase {
   public static final long SUSPEND_DURATION = 80;
   public static final int TABLETS = 30;
 
+  private ProcessReference metadataTserverProcess;
+
   @Override
   protected int defaultTimeoutSeconds() {
     return 5 * 60;
@@ -91,12 +101,54 @@ public class SuspendedTabletsIT extends 
ConfigurableMacBase {
     cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s");
     cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s");
     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
-    cfg.setNumTservers(TSERVERS);
+    // Start with 1 tserver, we'll increase that later
+    cfg.setNumTservers(1);
     // config custom balancer to keep all metadata on one server
-    cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + 
MetadataTable.NAME, "*");
-    cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, 
"7s");
-    cfg.setProperty(Property.TABLE_LOAD_BALANCER.getKey(),
-        HostRegexTableLoadBalancer.class.getName());
+    cfg.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, 
"1ms");
+    cfg.setProperty(Property.MANAGER_TABLET_BALANCER.getKey(),
+        HostAndPortRegexTableLoadBalancer.class.getName());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      // Wait for all tablet servers to come online and then choose the first 
server in the list.
+      // Update the balancer configuration to assign all metadata tablets to 
that server (and
+      // everything else to other servers).
+      InstanceOperations iops = client.instanceOperations();
+      List<String> tservers = iops.getTabletServers();
+      while (tservers == null || tservers.size() < 1) {
+        Thread.sleep(1000L);
+        tservers = client.instanceOperations().getTabletServers();
+      }
+      HostAndPort metadataServer = HostAndPort.fromString(tservers.get(0));
+      log.info("Configuring balancer to assign all metadata tablets to {}", 
metadataServer);
+      iops.setProperty(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + 
MetadataTable.NAME,
+          metadataServer.toString());
+
+      // Wait for the balancer to assign all metadata tablets to the chosen 
server.
+      ClientContext ctx = (ClientContext) client;
+      TabletLocations tl = TabletLocations.retrieve(ctx, MetadataTable.NAME, 
RootTable.NAME);
+      while (tl.hosted.keySet().size() != 1 || 
!tl.hosted.containsKey(metadataServer)) {
+        log.info("Metadata tablets are not hosted on the correct server. 
Waiting for balancer...");
+        Thread.sleep(1000L);
+        tl = TabletLocations.retrieve(ctx, MetadataTable.NAME, RootTable.NAME);
+      }
+      log.info("Metadata tablets are now hosted on {}", metadataServer);
+    }
+
+    // Since we started only a single tablet server, we know it's the one 
hosting the
+    // metadata table. Save its process reference off so we can exclude it 
later when
+    // killing tablet servers.
+    Collection<ProcessReference> procs = 
getCluster().getProcesses().get(ServerType.TABLET_SERVER);
+    assertEquals("Expected a single tserver process", 1, procs.size());
+    metadataTserverProcess = procs.iterator().next();
+
+    // Update the number of tservers and start the new tservers.
+    getCluster().getConfig().setNumTservers(TSERVERS);
+    getCluster().start();
   }
 
   @Test
@@ -104,9 +156,14 @@ public class SuspendedTabletsIT extends 
ConfigurableMacBase {
     // Run the test body. When we get to the point where we need a tserver to 
go away, get rid of it
     // via crashing
     suspensionTestBody((ctx, locs, count) -> {
-      List<ProcessReference> procs =
-          new 
ArrayList<>(getCluster().getProcesses().get(ServerType.TABLET_SERVER));
-      Collections.shuffle(procs);
+      // Exclude the tablet server hosting the metadata table from the list 
and only
+      // kill tablet servers that are not hosting the metadata table.
+      List<ProcessReference> procs = 
getCluster().getProcesses().get(ServerType.TABLET_SERVER)
+          .stream().filter(p -> 
!metadataTserverProcess.equals(p)).collect(Collectors.toList());
+      Collections.shuffle(procs, RANDOM);
+      assertEquals("Not enough tservers exist", TSERVERS - 1, procs.size());
+      assertTrue("Attempting to kill more tservers (" + count + ") than exist 
in the cluster ("
+          + procs.size() + ")", procs.size() >= count);
 
       for (int i = 0; i < count; ++i) {
         ProcessReference pr = procs.get(i);
@@ -140,9 +197,11 @@ public class SuspendedTabletsIT extends 
ConfigurableMacBase {
       }
 
       // remove servers with metadata on them from the list of servers to be 
shutdown
+      assertEquals("Expecting a single tServer in metadataServerSet", 1, 
metadataServerSet.size());
       tserverSet.removeAll(metadataServerSet);
 
-      assertEquals("Expecting two tServers in shutdown-list", 2, 
tserverSet.size());
+      assertEquals("Expecting " + (TSERVERS - 1) + " tServers in 
shutdown-list", TSERVERS - 1,
+          tserverSet.size());
 
       List<TServerInstance> tserversList = new ArrayList<>(tserverSet);
       Collections.shuffle(tserversList, RANDOM);
@@ -210,28 +269,30 @@ public class SuspendedTabletsIT extends 
ConfigurableMacBase {
       // ... and balanced.
       ctx.instanceOperations().waitForBalance();
       do {
-        // Give at least another 15 seconds for migrations to finish up
-        Thread.sleep(15000);
+        // Keep checking until all tablets are hosted and spread out across 
the tablet servers
+        Thread.sleep(1000);
         ds = TabletLocations.retrieve(ctx, tableName);
-      } while (ds.hostedCount != TABLETS);
+      } while (ds.hostedCount != TABLETS || ds.hosted.keySet().size() != 
(TSERVERS - 1));
 
-      // Pray all of our tservers have at least 1 tablet.
-      assertEquals(TSERVERS, ds.hosted.keySet().size());
+      // Given the loop exit condition above, at this point we're sure that 
all tablets are hosted
+      // and some are hosted on each of the tablet servers other than the one 
reserved for hosting
+      // the metadata table.
+      assertEquals(TSERVERS - 1, ds.hosted.keySet().size());
 
       // Kill two tablet servers hosting our tablets. This should put tablets 
into suspended state,
       // and thus halt balancing.
 
       TabletLocations beforeDeathState = ds;
       log.info("Eliminating tablet servers");
-      serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2);
+      serverStopper.eliminateTabletServers(ctx, beforeDeathState, TSERVERS - 
1);
 
       // All tablets should be either hosted or suspended.
       log.info("Waiting on suspended tablets");
       do {
         Thread.sleep(1000);
         ds = TabletLocations.retrieve(ctx, tableName);
-      } while (ds.suspended.keySet().size() != 2
-          && (ds.suspendedCount + ds.hostedCount) != TABLETS);
+      } while (ds.suspended.keySet().size() != (TSERVERS - 1)
+          || (ds.suspendedCount + ds.hostedCount) != TABLETS);
 
       SetMultimap<HostAndPort,KeyExtent> deadTabletsByServer = ds.suspended;
 
@@ -285,6 +346,48 @@ public class SuspendedTabletsIT extends 
ConfigurableMacBase {
     THREAD_POOL.shutdownNow();
   }
 
+  /**
+   * A version of {@link HostRegexTableLoadBalancer} that includes the tablet 
server port in
+   * addition to the host name when checking regular expressions. This is 
useful for testing when
+   * multiple tablet servers are running on the same host and one wishes to 
make pools from the
+   * tablet servers on that host.
+   */
+  public static class HostAndPortRegexTableLoadBalancer extends 
HostRegexTableLoadBalancer {
+    private static final Logger LOG =
+        
LoggerFactory.getLogger(HostAndPortRegexTableLoadBalancer.class.getName());
+
+    @Override
+    protected List<String> getPoolNamesForHost(TabletServerId tabletServerId) {
+      final String host = tabletServerId.getHost();
+      String test = host;
+      if (!isIpBasedRegex()) {
+        try {
+          test = getNameFromIp(host);
+        } catch (UnknownHostException e1) {
+          LOG.error("Unable to determine host name for IP: " + host + ", 
setting to default pool",
+              e1);
+          return Collections.singletonList(DEFAULT_POOL);
+        }
+      }
+
+      // Add the port on the end
+      final String hostString = test + ":" + tabletServerId.getPort();
+      List<String> pools = getPoolNameToRegexPattern().entrySet().stream()
+          .filter(e -> 
e.getValue().matcher(hostString).matches()).map(Map.Entry::getKey)
+          .collect(Collectors.toList());
+      if (pools.isEmpty()) {
+        pools.add(DEFAULT_POOL);
+      }
+      return pools;
+    }
+
+    @Override
+    public long balance(BalanceParameters params) {
+      super.balance(params);
+      return 1000L; // Balance once per second during the test
+    }
+  }
+
   private static class TabletLocations {
     public final Map<KeyExtent,TabletLocationState> locationStates = new 
HashMap<>();
     public final SetMultimap<HostAndPort,KeyExtent> hosted = 
HashMultimap.create();
@@ -295,6 +398,11 @@ public class SuspendedTabletsIT extends 
ConfigurableMacBase {
 
     public static TabletLocations retrieve(final ClientContext ctx, final 
String tableName)
         throws Exception {
+      return retrieve(ctx, tableName, MetadataTable.NAME);
+    }
+
+    public static TabletLocations retrieve(final ClientContext ctx, final 
String tableName,
+        final String metaName) throws Exception {
       int sleepTime = 200;
       int remainingAttempts = 30;
 
@@ -302,7 +410,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase 
{
         try {
           FutureTask<TabletLocations> tlsFuture = new FutureTask<>(() -> {
             TabletLocations answer = new TabletLocations();
-            answer.scan(ctx, tableName);
+            answer.scan(ctx, tableName, metaName);
             return answer;
           });
           THREAD_POOL.submit(tlsFuture);
@@ -321,10 +429,10 @@ public class SuspendedTabletsIT extends 
ConfigurableMacBase {
       }
     }
 
-    private void scan(ClientContext ctx, String tableName) {
+    private void scan(ClientContext ctx, String tableName, String metaName) {
       Map<String,String> idMap = ctx.tableOperations().tableIdMap();
       String tableId = Objects.requireNonNull(idMap.get(tableName));
-      try (var scanner = new MetaDataTableScanner(ctx, new Range(), 
MetadataTable.NAME)) {
+      try (var scanner = new MetaDataTableScanner(ctx, new Range(), metaName)) 
{
         while (scanner.hasNext()) {
           TabletLocationState tls = scanner.next();
 

Reply via email to