keith-turner commented on code in PR #3167:
URL: https://github.com/apache/accumulo/pull/3167#discussion_r1083513557


##########
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java:
##########
@@ -319,39 +325,38 @@ public void run() {
       }
     }
 
+    summariesExecutor.shutdownNow();
     LOG.info("Shutting down");
   }
 
   private void updateSummaries() {
-    ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
-        "Compaction Summary Gatherer", false);
-    try {
-      Set<String> queuesSeen = new ConcurrentSkipListSet<>();
 
-      tserverSet.getCurrentServers().forEach(tsi -> {
-        executor.execute(() -> updateSummaries(tsi, queuesSeen));
-      });
+    final ArrayList<Future<?>> tasks = new ArrayList<>();
+    Set<String> queuesSeen = new ConcurrentSkipListSet<>();
 
-      executor.shutdown();
+    tserverSet.getCurrentServers().forEach(tsi -> {
+      tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi, 
queuesSeen)));
+    });
 
-      try {
-        while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {}
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
+    // Wait for all tasks to complete
+    while (!tasks.isEmpty()) {
+      Iterator<Future<?>> iter = tasks.iterator();
+      while (iter.hasNext()) {
+        Future<?> f = iter.next();
+        if (f.isDone()) {

Review Comment:
   If there was an exception in the background task, is that something we even 
care about here in the foreground?



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -950,11 +952,10 @@ private SortedMap<TServerInstance,TabletServerStatus> 
gatherTableInformation(
       Set<TServerInstance> currentServers, 
SortedMap<TabletServerId,TServerStatus> balancerMap) {
     final long rpcTimeout = 
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
     int threads = 
getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE);
-    ExecutorService tp = ThreadPools.getServerThreadPools()

Review Comment:
   Its unrelated to this PR, but I think this used to be a thread pool of 
unlimited size.  I need to look back and see what changed.  There is a comment 
a bit further down in the code about rate limiting because its an unbounded 
thread pool.



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -1012,16 +1013,19 @@ private SortedMap<TServerInstance,TabletServerStatus> 
gatherTableInformation(
             badServers.remove(server);
           }
         }
-      });
-    }
-    tp.shutdown();
-    try {
-      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), 
TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      log.debug("Interrupted while fetching status");
+      }));
     }
 
-    tp.shutdownNow();
+    // Wait for all tasks to complete
+    while (!tasks.isEmpty()) {

Review Comment:
   This will also burn cpu while wating for all futures to be done.
   
   The old code did not wait on background task for an unlimited amount of 
time.  It would wait a fixed amount of time and then call shutdownNow which 
interrupts the background task.  With futures that could done by canceling any 
that are not complete within a given time.



##########
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java:
##########
@@ -319,39 +325,38 @@ public void run() {
       }
     }
 
+    summariesExecutor.shutdownNow();
     LOG.info("Shutting down");
   }
 
   private void updateSummaries() {
-    ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
-        "Compaction Summary Gatherer", false);
-    try {
-      Set<String> queuesSeen = new ConcurrentSkipListSet<>();
 
-      tserverSet.getCurrentServers().forEach(tsi -> {
-        executor.execute(() -> updateSummaries(tsi, queuesSeen));
-      });
+    final ArrayList<Future<?>> tasks = new ArrayList<>();
+    Set<String> queuesSeen = new ConcurrentSkipListSet<>();
 
-      executor.shutdown();
+    tserverSet.getCurrentServers().forEach(tsi -> {
+      tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi, 
queuesSeen)));
+    });
 
-      try {
-        while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {}
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
+    // Wait for all tasks to complete
+    while (!tasks.isEmpty()) {

Review Comment:
   This loop will burn CPU.  Could add a sleep at the end of the loop or call 
get() instead of isDone() on the future because get() will block. Calling get() 
on the future is also useful if knowing about background exceptions is desired.
   
   
   I looked to see if Java had anything to wait for a bunch of futures to be 
done, found this: https://stackoverflow.com/a/51006865



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -217,6 +217,8 @@ public class Manager extends AbstractServer
   private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
   private final AtomicBoolean managerUpgrading = new AtomicBoolean(false);
 
+  private ExecutorService tp = null;

Review Comment:
   Should probably give this a better name since its being promoted from a 
method variable to an instance variable.



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -1251,6 +1258,15 @@ boolean canSuspendTablets() {
       throw new IllegalStateException("Exception stopping status thread", e);
     }
 
+    tp.shutdown();
+    try {
+      final long rpcTimeout = 
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
+      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      log.debug("Interrupted while fetching status");
+    }

Review Comment:
   Could probably only call shutdownNow here (assuming this is in the manager 
shutdown code)
   
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to