ctubbsii commented on a change in pull request #2346:
URL: https://github.com/apache/accumulo/pull/2346#discussion_r745000366



##########
File path: 
core/src/test/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderTest.java
##########
@@ -34,7 +35,11 @@
 
   @Before
   public void setup() {
-    context = EasyMock.createMock(ClientContext.class);
+    context = EasyMock.createStrictMock(ClientContext.class);
+    AccumuloConfiguration conf = 
EasyMock.createStrictMock(AccumuloConfiguration.class);
+    EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    EasyMock.expect(context.getThreadPools()).andReturn(new 
ClientThreadPools()).anyTimes();
+    EasyMock.replay(context);

Review comment:
       EasyMock methods are good candidates for using static imports, just 
because they make the tests slightly more readable without losing anything.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
##########
@@ -716,6 +718,9 @@ public void close() {
       thriftTransportPool.shutdown();
     }
     singletonReservation.close();
+    if (threadPools != null) {
+      threadPools.close();
+    }

Review comment:
       `threadPools` is `final` and set explicitly to a new object in the 
constructor, so it can never be null.
   
   ```suggestion
       threadPools.close();
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -703,7 +726,7 @@ void queueMutations(final MutationSet mutationsToSend) {
             log.trace("{} - binning {} mutations", 
Thread.currentThread().getName(),
                 mutationsToSend.size());
             addMutations(mutationsToSend);
-          } catch (Exception e) {
+          } catch (Throwable e) {
             updateUnknownErrors("Error processing mutation set", e);

Review comment:
       Catching Throwable here seems counterproductive to the goal of this... 
which I understand is to give control to the calling code that provides the 
thread pool when an error occurs. We can't do that if we're swallowing the 
errors instead of letting the user specify their own uncaught exception handler 
on their thread pool.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -801,6 +831,8 @@ public Result write(ConditionalMutation mutation) {
   @Override
   public void close() {
     threadPool.shutdownNow();
+    threadPoolCleanable.clean(); // deregister the cleaner, will not call 
shutdownNow() because
+                                 // closed is now true

Review comment:
       The comment does not appear to be correct. The cleaner will not call 
shutdownNow because the closed is now true, it will not call shutdownNow 
because it was passed a NOOP runnable. It's not passed any kind of Closeable to 
be able to detect whether closed is true now or not. It's doing nothing solely 
because it was only given a NOOP runnable.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -513,10 +522,17 @@ private synchronized void updateServerErrors(String 
server, Exception e) {
     log.error("Server side error on {}", server, e);
   }
 
-  private synchronized void updateUnknownErrors(String msg, Exception t) {
+  private synchronized void updateUnknownErrors(String msg, Throwable t) {
     somethingFailed = true;
     unknownErrors++;
-    this.lastUnknownError = t;
+    // Multiple errors may occur between the time checkForFailures() is called
+    // by the client. Be sure to return an Error if one (or more) occurred.
+    // Set lastUnknownError if it's null, to an Error, or to an Exception if 
it's not already an
+    // Error
+    if (this.lastUnknownError == null
+        || !(t instanceof Exception && this.lastUnknownError instanceof 
Error)) {

Review comment:
       Is this trying to store the most severe of the problems? (Error 
preferred, then Exception, then null)?
   It's probably still a good idea to keep the old problems using 
`addSuppressed`.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientThreadPools.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+
+public class ClientThreadPools {
+
+  public static class ThreadPoolConfig {

Review comment:
       I like this class being consolidated, for the most part, but it's not 
clear how having this consolidated here allows users to be able to have greater 
control over the thread pool lifecycles/error handling, which is what I thought 
was part of the goal. I don't see it exposed in the public API anywhere.

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -625,19 +641,26 @@ public void run() {
 
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
     private final ThreadPoolExecutor sendThreadPool;
+    private final Cleanable sendThreadPoolCleanable;
     private final ThreadPoolExecutor binningThreadPool;
+    private final Cleanable binningThreadPoolCleanable;
     private final Map<String,TabletServerMutations<Mutation>> serversMutations;
     private final Set<String> queued;
     private final Map<TableId,TabletLocator> locators;
 
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<>();
       queued = new HashSet<>();
-      sendThreadPool =
-          ThreadPools.createFixedThreadPool(numSendThreads, 
this.getClass().getName(), false);
+      sendThreadPool = 
context.getThreadPools().newThreadPool(ThreadPoolType.BATCH_WRITER_SEND_POOL,
+          new ThreadPoolConfig(context.getConfiguration(), numSendThreads));
+      sendThreadPoolCleanable =
+          CleanerUtil.shutdownThreadPoolExecutor(sendThreadPool, () -> {}, 
log);

Review comment:
       This seems to register a cleaner that does nothing to attempt to 
shutdown the thread pool. It seems odd to have a cleaner that doesn't do any 
cleaning, and just does a NOOP (`() -> {}`)

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientThreadPools.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+
+public class ClientThreadPools {
+
+  public static class ThreadPoolConfig {
+
+    public static final ThreadPoolConfig EMPTY_CONFIG =
+        new ThreadPoolConfig(Optional.empty(), Optional.empty(), 
Optional.empty());
+
+    private final Optional<Iterable<Entry<String,String>>> configuration;
+    private final Optional<Integer> numThreads;
+    private final Optional<String> threadName;
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration) {
+      this(Optional.of(configuration), Optional.empty(), Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int 
numThreads) {
+      this(Optional.of(configuration), Optional.of(numThreads), 
Optional.empty());
+    }
+
+    public ThreadPoolConfig(Iterable<Entry<String,String>> configuration, int 
numThreads,
+        String threadName) {
+      this(Optional.of(configuration), Optional.of(numThreads), 
Optional.of(threadName));
+    }
+
+    private ThreadPoolConfig(Optional<Iterable<Entry<String,String>>> 
configuration,
+        Optional<Integer> numThreads, Optional<String> threadName) {
+      this.configuration = configuration;
+      this.numThreads = numThreads;
+      this.threadName = threadName;
+    }
+
+    public Optional<Iterable<Entry<String,String>>> getConfiguration() {
+      return configuration;
+    }
+
+    public Optional<Integer> getNumThreads() {
+      return numThreads;
+    }
+
+    public Optional<String> getThreadName() {
+      return threadName;
+    }
+  }
+
+  public static enum ThreadPoolType {
+    /**
+     * ThreadPoolExecutor that runs bulk import tasks
+     */
+    BULK_IMPORT_POOL,
+    /**
+     * ThreadPoolExecutor that runs tasks to contact Compactors to get running 
compaction
+     * information
+     */
+    ACTIVE_EXTERNAL_COMPACTION_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor used for adding splits to a table
+     */
+    ADD_SPLITS_THREAD_POOL,
+    /**
+     * ThreadPoolExecutor used for fetching data from the TabletServers
+     */
+    BATCH_SCANNER_READ_AHEAD_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of binning mutations
+     */
+    BATCH_WRITER_BINNING_POOL,
+    /**
+     * ThreadPoolExecutor that runs the tasks of sending mutations to 
TabletServers
+     */
+    BATCH_WRITER_SEND_POOL,
+    /**
+     * ThreadPoolExecutor that runs clean up tasks when close is called on the 
ConditionalWriter
+     */
+    CONDITIONAL_WRITER_CLEANUP_TASK_POOL,
+    /**
+     * ThreadPoolExecutor responsible for loading bloom filters
+     */
+    BLOOM_FILTER_LAYER_LOADER_POOL
+  }
+
+  public static enum ScheduledThreadPoolType {
+    /**
+     * shared scheduled executor for trivial tasks
+     */
+    SHARED_GENERAL_SCHEDULED_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that runs tasks for the BatchWriter to meet 
the users latency
+     * goals.
+     */
+    BATCH_WRITER_LATENCY_TASK_POOL,
+    /**
+     * ScheduledThreadPoolExecutor that periodically runs tasks to handle 
failed write mutations and
+     * send mutations to TabletServers
+     */
+    CONDITIONAL_WRITER_RETRY_POOL
+  }
+
+  private ScheduledThreadPoolExecutor sharedScheduledThreadPool = null;
+
+  public ThreadPoolExecutor newThreadPool(ThreadPoolType usage, 
ThreadPoolConfig config) {
+    switch (usage) {
+      case BULK_IMPORT_POOL:
+        Objects.requireNonNull(config.getNumThreads().get(), "Number of 
threads must be set");

Review comment:
       Some of these lines might be shorter and read more easily if you 
statically import requireNonNull

##########
File path: 
core/src/main/java/org/apache/accumulo/core/util/threads/AccumuloUncaughtExceptionHandler.java
##########
@@ -44,7 +46,10 @@ public void uncaughtException(Thread t, Throwable e) {
         // If e == OutOfMemoryError, then it's probably that another Error 
might be
         // thrown when trying to print to System.err.
       } finally {
-        Runtime.getRuntime().halt(-1);
+        Mode m = SingletonManager.getMode();
+        if (m != null && m.equals(Mode.SERVER)) {

Review comment:
       The mode should never be null. Also, you should use `==` for comparing 
enums, not `.equals`
   
   ```suggestion
           if (SingletonManager.getMode() == Mode.SERVER) {
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -386,15 +386,12 @@ public void run() {
           fatalException = new TableDeletedException(tableId.canonical());
       } catch (SampleNotPresentException e) {
         fatalException = e;
-      } catch (Exception t) {
+      } catch (Throwable t) {
         if (queryThreadPool.isShutdown())
           log.debug("Caught exception, but queryThreadPool is shutdown", t);
         else
           log.warn("Caught exception, but queryThreadPool is not shutdown", t);
         fatalException = t;
-      } catch (Throwable t) {
-        fatalException = t;
-        throw t; // let uncaught exception handler deal with the Error

Review comment:
       I like the simplicity of this earlier solution. If the user is providing 
their own thread pools, why wouldn't they be able to provide their own uncaught 
exception handler to receive these and handle them on their own, if they wish?

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
##########
@@ -124,20 +133,26 @@ public boolean hasNext() {
     throw new NoSuchElementException();
   }
 
+  void closeThriftScanner() {
+    synchronized (scanState) {
+      // this is synchronized so its mutually exclusive with readBatch()
+      try {
+        closed = true;
+        ThriftScanner.close(scanState);
+      } catch (Exception e) {
+        LoggerFactory.getLogger(ScannerIterator.class).debug("Exception when 
closing scan session",
+            e);
+      }
+    }
+  }
+
   void close() {
     // run actual close operation in the background so this does not block.
     readaheadPool.execute(() -> {
-      synchronized (scanState) {
-        // this is synchronized so its mutually exclusive with readBatch()
-        try {
-          closed = true;
-          ThriftScanner.close(scanState);
-        } catch (Exception e) {
-          LoggerFactory.getLogger(ScannerIterator.class)
-              .debug("Exception when closing scan session", e);
-        }
-      }
+      closeThriftScanner();
     });
+    readaheadPoolCleanable.clean();
+    this.poolCloser.execute(() -> readaheadPool.shutdownNow());

Review comment:
       Why is a separate thread pool being used to shut down this thread pool 
instead of the cleaner doing that?

##########
File path: 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientThreadPools.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+
+public class ClientThreadPools {
+
+  public static class ThreadPoolConfig {
+
+    public static final ThreadPoolConfig EMPTY_CONFIG =
+        new ThreadPoolConfig(Optional.empty(), Optional.empty(), 
Optional.empty());
+
+    private final Optional<Iterable<Entry<String,String>>> configuration;

Review comment:
       Because this is an internal class only now, it might be more readable to 
just use the internal class, AccumuloConfiguration instead of the Iterable 
interface type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to