This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 5d12597b4d Add LogSorter to Compactor and ScanServer (#4239)
5d12597b4d is described below

commit 5d12597b4dedb60524775d0411405bafc3ee86ce
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Feb 14 08:06:49 2024 -0500

    Add LogSorter to Compactor and ScanServer (#4239)
    
    Adds LogSorter to ScanServer and Compactor such that
    all available processes will participate when a failure
    occurs that leaves a Tablet with walogs. Modified LogSorter
    and DistributedWorkQueue so that the Compactor could call
    the LogSorter and have its tasks execute serially in the
    Compactor thread.
    
    Fixes #4232
---
 .../org/apache/accumulo/core/conf/Property.java    |   3 +
 .../MiniAccumuloClusterControl.java                |   4 +
 .../server/zookeeper/DistributedWorkQueue.java     |  97 ++++++---
 server/compactor/pom.xml                           |   4 +
 .../org/apache/accumulo/compactor/Compactor.java   |  11 +
 .../org/apache/accumulo/tserver/ScanServer.java    |  17 ++
 .../org/apache/accumulo/tserver/TabletServer.java  |  20 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  25 ++-
 .../java/org/apache/accumulo/test/RecoveryIT.java  | 231 +++++++++++++++++++++
 9 files changed, 371 insertions(+), 41 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 12f69d2ff9..22c2606c85 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -520,6 +520,9 @@ public enum Property {
   @Experimental
   SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s", 
PropertyType.TIMEDURATION,
       "The time between adjustments of the thrift server thread pool.", 
"2.1.0"),
+  @Experimental
+  SSERV_WAL_SORT_MAX_CONCURRENT("sserver.wal.sort.concurrent.max", "2", 
PropertyType.COUNT,
+      "The maximum number of threads to use to sort logs during recovery.", 
"4.0.0"),
   // properties that are specific to tablet server behavior
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX,
       "Properties in this category affect the behavior of the tablet 
servers.", "1.3.5"),
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index b3977dab00..21e831c3e0 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -519,4 +519,8 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
     stop(server, hostname);
   }
 
+  public List<Process> getTabletServers(String resourceGroup) {
+    return tabletServerProcesses.get(resourceGroup);
+  }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index cd667909d0..2263e02af4 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +58,6 @@ public class DistributedWorkQueue {
 
   private static final Logger log = 
LoggerFactory.getLogger(DistributedWorkQueue.class);
 
-  private ThreadPoolExecutor threadPool;
   private ZooReaderWriter zoo;
   private String path;
   private ServerContext context;
@@ -65,12 +65,20 @@ public class DistributedWorkQueue {
 
   private AtomicInteger numTask = new AtomicInteger(0);
 
-  private void lookForWork(final Processor processor, List<String> children) {
+  /**
+   * Finds a child in {@code children} that is not currently being processed 
and adds a Runnable to
+   * the {@code executor} that invokes the {@code processor}. The Runnable 
will recursively call
+   * {@code lookForWork} after it invokes the {@code processor} such that it 
will continue to look
+   * for children that need work until that condition is exhausted. This 
method will return early if
+   * the number of currently running tasks is larger than {@code maxThreads}.
+   */
+  private void lookForWork(final Processor processor, final List<String> 
children,
+      final ExecutorService executor, final int maxThreads) {
     if (children.isEmpty()) {
       return;
     }
 
-    if (numTask.get() >= threadPool.getCorePoolSize()) {
+    if (numTask.get() >= maxThreads) {
       return;
     }
 
@@ -102,7 +110,7 @@ public class DistributedWorkQueue {
         }
 
         // Great... we got the lock, but maybe we're too busy
-        if (numTask.get() >= threadPool.getCorePoolSize()) {
+        if (numTask.get() >= maxThreads) {
           zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
           break;
         }
@@ -143,7 +151,7 @@ public class DistributedWorkQueue {
 
             try {
               // its important that this is called after numTask is decremented
-              lookForWork(processor, zoo.getChildren(path));
+              lookForWork(processor, zoo.getChildren(path), executor, 
maxThreads);
             } catch (KeeperException e) {
               log.error("Failed to look for work", e);
             } catch (InterruptedException e) {
@@ -153,7 +161,7 @@ public class DistributedWorkQueue {
         };
 
         numTask.incrementAndGet();
-        threadPool.execute(task);
+        executor.execute(task);
 
       }
     } catch (Exception t) {
@@ -186,40 +194,62 @@ public class DistributedWorkQueue {
     return context;
   }
 
-  public void startProcessing(final Processor processor, ThreadPoolExecutor 
executorService)
-      throws KeeperException, InterruptedException {
+  public long getCheckInterval() {
+    return this.timerPeriod;
+  }
 
-    threadPool = executorService;
+  /**
+   * Finds the children at the path passed in the constructor and calls {@code 
lookForWork} which
+   * will attempt to process all of the currently available work
+   */
+  public void processExistingWork(final Processor processor, ExecutorService 
executor,
+      final int maxThreads, boolean setWatch) throws KeeperException, 
InterruptedException {
 
     zoo.mkdirs(path);
     zoo.mkdirs(path + "/" + LOCKS_NODE);
 
-    List<String> children = zoo.getChildren(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        switch (event.getType()) {
-          case NodeChildrenChanged:
-            if (event.getPath().equals(path)) {
-              try {
-                lookForWork(processor, zoo.getChildren(path, this));
-              } catch (KeeperException e) {
-                log.error("Failed to look for work at path {}; {}", path, 
event, e);
-              } catch (InterruptedException e) {
-                log.info("Interrupted looking for work at path {}; {}", path, 
event, e);
+    List<String> children = null;
+    if (setWatch) {
+      children = zoo.getChildren(path, new Watcher() {
+        @Override
+        public void process(WatchedEvent event) {
+          switch (event.getType()) {
+            case NodeChildrenChanged:
+              if (event.getPath().equals(path)) {
+                try {
+                  lookForWork(processor, zoo.getChildren(path, this), 
executor, maxThreads);
+                } catch (KeeperException e) {
+                  log.error("Failed to look for work at path {}; {}", path, 
event, e);
+                } catch (InterruptedException e) {
+                  log.info("Interrupted looking for work at path {}; {}", 
path, event, e);
+                }
+              } else {
+                log.info("Unexpected path for NodeChildrenChanged event 
watching path {}; {}", path,
+                    event);
               }
-            } else {
-              log.info("Unexpected path for NodeChildrenChanged event watching 
path {}; {}", path,
-                  event);
-            }
-            break;
-          default:
-            log.info("Unexpected event watching path {}; {}", path, event);
-            break;
+              break;
+            default:
+              log.info("Unexpected event watching path {}; {}", path, event);
+              break;
+          }
         }
-      }
-    });
+      });
+    } else {
+      children = zoo.getChildren(path);
+    }
+
+    lookForWork(processor, children, executor, maxThreads);
+
+  }
+
+  /**
+   * Calls {@code runOne} to attempt to process all currently available work, 
then adds a background
+   * thread that looks for work in the future.
+   */
+  public void processExistingAndFuture(final Processor processor,
+      ThreadPoolExecutor executorService) throws KeeperException, 
InterruptedException {
 
-    lookForWork(processor, children);
+    processExistingWork(processor, executorService, 
executorService.getCorePoolSize(), true);
 
     // Add a little jitter to avoid all the tservers slamming zookeeper at once
     ThreadPools.watchCriticalScheduledTask(
@@ -228,7 +258,8 @@ public class DistributedWorkQueue {
           public void run() {
             log.debug("Looking for work in {}", path);
             try {
-              lookForWork(processor, zoo.getChildren(path));
+              lookForWork(processor, zoo.getChildren(path), executorService,
+                  executorService.getCorePoolSize());
             } catch (KeeperException e) {
               log.error("Failed to look for work", e);
             } catch (InterruptedException e) {
diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml
index eaa30280f8..0f021dd497 100644
--- a/server/compactor/pom.xml
+++ b/server/compactor/pom.xml
@@ -55,6 +55,10 @@
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-start</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-tserver</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client-api</artifactId>
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 2d93e42068..6a234824db 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -109,6 +109,7 @@ import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.tserver.log.LogSorter;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
@@ -635,6 +636,8 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
 
       final AtomicReference<Throwable> err = new AtomicReference<>();
       final AtomicLong timeSinceLastCompletion = new AtomicLong(0L);
+      final LogSorter logSorter = new LogSorter(getContext(), 
getConfiguration());
+      long nextSortLogsCheckTime = System.currentTimeMillis();
 
       while (!shutdown) {
 
@@ -649,6 +652,14 @@ public class Compactor extends AbstractServer implements 
MetricsProducer, Compac
         err.set(null);
         JOB_HOLDER.reset();
 
+        if (System.currentTimeMillis() > nextSortLogsCheckTime) {
+          // Attempt to process all existing log sorting work serially in this 
thread.
+          // When no work remains, this call will return so that we can look 
for compaction
+          // work.
+          LOG.debug("Checking to see if any recovery logs need sorting");
+          nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
+        }
+
         TExternalCompactionJob job;
         try {
           job = getNextJob(getNextId());
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 2efd38b709..a5d74b3c07 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -103,6 +103,7 @@ import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 import org.apache.accumulo.server.security.SecurityUtil;
 import 
org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.log.LogSorter;
 import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.tserver.session.MultiScanSession;
 import org.apache.accumulo.tserver.session.ScanSession;
@@ -390,6 +391,22 @@ public class ScanServer extends AbstractServer
 
     ServiceLock lock = announceExistence();
 
+    int threadPoolSize = 
getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT);
+    if (threadPoolSize > 0) {
+      final LogSorter logSorter = new LogSorter(context, getConfiguration());
+      try {
+        // Attempt to process all existing log sorting work and start a 
background
+        // thread to look for log sorting work in the future
+        logSorter.startWatchingForRecoveryLogs(threadPoolSize);
+      } catch (Exception ex) {
+        log.error("Error starting LogSorter");
+        throw new RuntimeException(ex);
+      }
+    } else {
+      log.info(
+          "Log sorting for tablet recovery is disabled, 
SSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
+    }
+
     try {
       while (!serverStopRequested) {
         UtilWaitThread.sleep(1000);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 90244c3ba1..0de617455c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -602,12 +602,22 @@ public class TabletServer extends AbstractServer 
implements TabletHostingServer
       throw new RuntimeException(e);
     }
 
-    try {
-      logSorter.startWatchingForRecoveryLogs();
-    } catch (Exception ex) {
-      log.error("Error setting watches for recoveries");
-      throw new RuntimeException(ex);
+    int threadPoolSize =
+        
getContext().getConfiguration().getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
+    if (threadPoolSize > 0) {
+      try {
+        // Attempt to process all existing log sorting work and start a 
background
+        // thread to look for log sorting work in the future
+        logSorter.startWatchingForRecoveryLogs(threadPoolSize);
+      } catch (Exception ex) {
+        log.error("Error starting LogSorter");
+        throw new RuntimeException(ex);
+      }
+    } else {
+      log.info(
+          "Log sorting for tablet recovery is disabled, 
TSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
     }
+
     final AccumuloConfiguration aconf = getConfiguration();
 
     final long onDemandUnloaderInterval =
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 33ce1989e4..dda829d0a8 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -62,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
 
 public class LogSorter {
 
@@ -290,12 +291,30 @@ public class LogSorter {
     }
   }
 
-  public void startWatchingForRecoveryLogs() throws KeeperException, 
InterruptedException {
-    int threadPoolSize = 
this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
+  /**
+   * Sort any logs that need sorting in the current thread.
+   *
+   * @return The time in millis when the next check can be done.
+   */
+  public long sortLogsIfNeeded() throws KeeperException, InterruptedException {
+    DistributedWorkQueue dwq = new DistributedWorkQueue(
+        context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, 
context);
+    dwq.processExistingWork(new LogProcessor(), 
MoreExecutors.newDirectExecutorService(), 1, false);
+    return System.currentTimeMillis() + dwq.getCheckInterval();
+  }
+
+  /**
+   * Sort any logs that need sorting in a ThreadPool using
+   * {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will 
start a background
+   * thread to look for log sorting work in the future that will be processed 
by the
+   * ThreadPoolExecutor
+   */
+  public void startWatchingForRecoveryLogs(int threadPoolSize)
+      throws KeeperException, InterruptedException {
     ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools()
         .createFixedThreadPool(threadPoolSize, this.getClass().getName(), 
true);
     new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, 
sortedLogConf,
-        context).startProcessing(new LogProcessor(), threadPool);
+        context).processExistingAndFuture(new LogProcessor(), threadPool);
   }
 
   public List<RecoveryStatus> getLogSorts() {
diff --git a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java 
b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
new file mode 100644
index 0000000000..c8fb440eea
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.server.manager.recovery.RecoveryPath;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@Tag(MINI_CLUSTER_ONLY)
+public class RecoveryIT extends AccumuloClusterHarness {
+
+  private static final String RESOURCE_GROUP = "RG1";
+
+  private volatile boolean disableTabletServerLogSorting = false;
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(3);
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
+    cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5s");
+    
cfg.getClusterServerConfiguration().addTabletServerResourceGroup(RESOURCE_GROUP,
 3);
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+    if (disableTabletServerLogSorting) {
+      cfg.setProperty(Property.TSERV_WAL_SORT_MAX_CONCURRENT, "0");
+    }
+    // file system supports recovery
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Override
+  public void setupCluster() throws Exception {
+    // Do *NOT* startup the cluster here. We are doing it in the test
+    // method so that we can modify the properties for each test run
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"TSERVER", "SSERVER", "COMPACTOR"})
+  public void test(String serverForSorting) throws Exception {
+
+    // Determine whether or not we need to disable log sorting
+    // in the TabletServer. We want to do this when the serverForSorting
+    // parameter is SSERVER or COMPACTOR
+    switch (serverForSorting) {
+      case "TSERVER":
+        disableTabletServerLogSorting = false;
+        break;
+      case "SSERVER":
+      case "COMPACTOR":
+      default:
+        disableTabletServerLogSorting = true;
+    }
+
+    // Start the cluster
+    super.setupCluster();
+
+    // create a table
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      SortedSet<Text> splits = new TreeSet<>();
+      IntStream.range(97, 122).forEach(i -> splits.add(new Text(new String("" 
+ i))));
+
+      NewTableConfiguration ntc = new NewTableConfiguration();
+      Map<String,String> tableProps = new HashMap<>();
+      tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "100");
+      tableProps.put(Property.TABLE_FILE_MAX.getKey(), "3");
+      tableProps.put(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY, 
RESOURCE_GROUP);
+      ntc.setProperties(tableProps);
+      ntc.withSplits(splits);
+      ntc.withInitialTabletAvailability(TabletAvailability.ONDEMAND);
+      c.tableOperations().create(tableName, ntc);
+
+      c.instanceOperations().waitForBalance();
+
+      TableId tid = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+      ReadWriteIT.ingest(c, 1000, 1, 1, 0, "", tableName, 100);
+
+      // Confirm that there are no walog entries for this table
+      assertEquals(0, countWaLogEntries(c, tid));
+
+      MiniAccumuloClusterImpl mini = (MiniAccumuloClusterImpl) cluster;
+      MiniAccumuloClusterControl control = (MiniAccumuloClusterControl) 
cluster.getClusterControl();
+
+      // Stop any running Compactors and ScanServers
+      control.stopAllServers(ServerType.COMPACTOR);
+      control.stopAllServers(ServerType.SCAN_SERVER);
+
+      // Kill the TabletServer in resource group that is hosting the table
+      List<Process> procs = control.getTabletServers(RESOURCE_GROUP);
+      assertEquals(3, procs.size());
+      for (int i = 0; i < 3; i++) {
+        procs.get(i).destroyForcibly().waitFor();
+      }
+      control.getTabletServers(RESOURCE_GROUP).clear();
+
+      // The TabletGroupWatcher in the Manager will notice that the 
TabletServer is dead
+      // and will assign the TabletServer's walog to all of the tablets that 
were assigned
+      // to that server. Confirm that walog entries exist for this tablet
+      if (!serverForSorting.equals("TSERVER")) {
+        Wait.waitFor(() -> countWaLogEntries(c, tid) > 0, 60_000);
+      }
+
+      // Start the process that will do the log sorting
+      switch (serverForSorting) {
+        case "TSERVER":
+          // We don't need to re-start the TabletServers here, there is
+          // already a TabletServer running in the default group that
+          // is hosting the root and metadata tables. It should perform
+          // the log sorting.
+          break;
+        case "SSERVER":
+          
mini.getConfig().getClusterServerConfiguration().setNumDefaultScanServers(1);
+          getClusterControl().startAllServers(ServerType.SCAN_SERVER);
+          break;
+        case "COMPACTOR":
+          
mini.getConfig().getClusterServerConfiguration().setNumDefaultCompactors(1);
+          getClusterControl().startAllServers(ServerType.COMPACTOR);
+          break;
+        case "ALL":
+        default:
+          fail("Unhandled server type: " + serverForSorting);
+      }
+
+      // Confirm sorting completed
+      Wait.waitFor(() -> logSortingCompleted(c, tid) == true, 60_000);
+
+      // Start the tablet servers so that the Manager
+      // can assign the table and so that recovery can be completed.
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+
+      c.instanceOperations().waitForBalance();
+
+      // When the tablet is hosted, the sorted walogs will be applied
+      // Confirm the 3 walog entries are gone for this tablet
+      assertEquals(0, countWaLogEntries(c, tid));
+
+      // Scan the data and make sure its there.
+      ReadWriteIT.verify(c, 1000, 1, 1, 0, "", tableName);
+
+    }
+  }
+
+  private long countWaLogEntries(AccumuloClient c, TableId tableId) throws 
Exception {
+    try (TabletsMetadata tabletsMetadata = ((ClientContext) 
c).getAmple().readTablets()
+        .forTable(tableId).fetch(TabletMetadata.ColumnType.LOGS).build()) {
+      return tabletsMetadata.stream().filter(tabletMetadata -> 
tabletMetadata.getLogs() != null)
+          .count();
+    }
+  }
+
+  private boolean logSortingCompleted(AccumuloClient c, TableId tableId) 
throws Exception {
+    try (TabletsMetadata tabletsMetadata = ((ClientContext) 
c).getAmple().readTablets()
+        .forTable(tableId).fetch(TabletMetadata.ColumnType.LOGS).build()) {
+      ServerContext ctx = getCluster().getServerContext();
+      for (TabletMetadata tm : tabletsMetadata) {
+        for (LogEntry walog : tm.getLogs()) {
+          String sortId = walog.getUniqueID().toString();
+          String filename = walog.getPath();
+          String dest = RecoveryPath.getRecoveryPath(new 
Path(filename)).toString();
+
+          if (ctx.getZooCache().get(ctx.getZooKeeperRoot() + 
Constants.ZRECOVERY + "/" + sortId)
+              != null
+              || 
!ctx.getVolumeManager().exists(SortedLogState.getFinishedMarkerPath(dest))) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+  }
+
+}

Reply via email to