This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new b90ca95323 Adds constants for threadPool Names (#4768) b90ca95323 is described below commit b90ca953239ac6def31f9ae508b48e51b610bcf2 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Sun Jul 28 18:16:48 2024 -0400 Adds constants for threadPool Names (#4768) - updates pool prefix so that pools report as accumulo.pool.... - change names that had names with spaces to use dot notation. - removed metrics from pools where no user provided properties are used - Create constants for thread pool names to help with metric names and ease of troubleshooting Adds constants for the following pools: * Bulk Import Threads * Compaction Service Planner * Tablet wal creator * Scan server tablet metadata cache * Manager upgrade metadata * Compaction Coordinator summary gatherer * Utility check pools --------- Co-authored-by: Ed Coleman <edcole...@apache.org> Co-authored-by: Ed <d...@etcoleman.com> Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../accumulo/core/clientImpl/ClientContext.java | 10 +-- .../core/clientImpl/ConditionalWriterImpl.java | 3 +- .../core/clientImpl/InstanceOperationsImpl.java | 3 +- .../core/clientImpl/TableOperationsImpl.java | 3 +- .../core/clientImpl/TabletServerBatchReader.java | 8 ++- .../core/clientImpl/TabletServerBatchWriter.java | 8 ++- .../accumulo/core/clientImpl/bulk/BulkImport.java | 12 ++-- .../accumulo/core/file/BloomFilterLayer.java | 3 +- .../util/compaction/ExternalCompactionUtil.java | 6 +- .../core/util/threads/ThreadPoolNames.java | 78 +++++++++++++++++++++ .../accumulo/core/util/threads/ThreadPools.java | 79 ++++++++++++++++------ .../core/file/rfile/MultiThreadedRFileTest.java | 2 +- .../threads/ThreadPoolExecutorBuilderTest.java | 14 ++-- .../accumulo/server/client/BulkImporter.java | 13 ++-- .../conf/store/impl/PropCacheCaffeineImpl.java | 4 +- .../server/conf/store/impl/PropStoreWatcher.java | 2 +- .../accumulo/server/problems/ProblemReports.java | 2 +- .../apache/accumulo/server/rpc/TServerUtils.java | 12 ++-- .../server/util/RemoveEntriesForMissingFiles.java | 6 +- .../server/util/VerifyTabletAssignments.java | 6 +- .../server/conf/store/impl/ReadyMonitorTest.java | 2 +- .../coordinator/CompactionCoordinator.java | 3 +- .../accumulo/coordinator/CompactionFinalizer.java | 6 +- .../manager/tableOps/bulkVer2/BulkImportMove.java | 4 +- .../tableOps/tableImport/MoveExportedFiles.java | 4 +- .../manager/upgrade/UpgradeCoordinator.java | 6 +- .../org/apache/accumulo/tserver/ScanServer.java | 5 +- .../tserver/TabletServerResourceManager.java | 60 +++++++++------- .../tserver/compactions/CompactionService.java | 6 +- .../compactions/InternalCompactionExecutor.java | 8 ++- .../org/apache/accumulo/tserver/log/LogSorter.java | 3 +- .../accumulo/tserver/log/TabletServerLogger.java | 5 +- .../accumulo/test/BalanceWithOfflineTableIT.java | 4 +- .../test/functional/BatchWriterFlushIT.java | 2 +- 34 files changed, 281 insertions(+), 111 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 7c6ba76549..204665d0fc 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -26,6 +26,8 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_CLEANUP_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCANNER_READ_AHEAD_POOL; import java.lang.Thread.UncaughtExceptionHandler; import java.net.URL; @@ -257,9 +259,9 @@ public class ClientContext implements AccumuloClient { submitScannerReadAheadTask(Callable<List<KeyValue>> c) { ensureOpen(); if (scannerReadaheadPool == null) { - scannerReadaheadPool = clientThreadPools.getPoolBuilder("Accumulo scanner read ahead thread") + scannerReadaheadPool = clientThreadPools.getPoolBuilder(SCANNER_READ_AHEAD_POOL) .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(3L, SECONDS) - .withQueue(new SynchronousQueue<>()).enableThreadPoolMetrics().build(); + .withQueue(new SynchronousQueue<>()).build(); } return scannerReadaheadPool.submit(c); } @@ -267,8 +269,8 @@ public class ClientContext implements AccumuloClient { public synchronized void executeCleanupTask(Runnable r) { ensureOpen(); if (cleanupThreadPool == null) { - cleanupThreadPool = clientThreadPools.getPoolBuilder("Conditional Writer Cleanup Thread") - .numCoreThreads(1).withTimeOut(3L, SECONDS).enableThreadPoolMetrics().build(); + cleanupThreadPool = clientThreadPools.getPoolBuilder(CONDITIONAL_WRITER_CLEANUP_POOL) + .numCoreThreads(1).withTimeOut(3L, SECONDS).build(); } this.cleanupThreadPool.execute(r); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index cb7675196c..ceaf6901a5 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_POOL; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -379,7 +380,7 @@ public class ConditionalWriterImpl implements ConditionalWriter { this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); this.threadPool = context.threadPools().createScheduledExecutorService( - config.getMaxWriteThreads(), this.getClass().getSimpleName()); + config.getMaxWriteThreads(), CONDITIONAL_WRITER_POOL.poolName); this.locator = new SyncingTabletLocator(context, tableId); this.serverQueues = new HashMap<>(); this.tableId = tableId; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 084d59ef11..8c73dab8e5 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -28,6 +28,7 @@ import static org.apache.accumulo.core.rpc.ThriftUtil.createClient; import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport; import static org.apache.accumulo.core.rpc.ThriftUtil.getClient; import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_COMPACTIONS_FINDER_POOL; import java.util.ArrayList; import java.util.Collections; @@ -301,7 +302,7 @@ public class InstanceOperationsImpl implements InstanceOperations { List<String> tservers = getTabletServers(); int numThreads = Math.max(4, Math.min((tservers.size() + compactors.size()) / 10, 256)); - var executorService = context.threadPools().getPoolBuilder("getactivecompactions") + var executorService = context.threadPools().getPoolBuilder(INSTANCE_OPS_COMPACTIONS_FINDER_POOL) .numCoreThreads(numThreads).build(); try { List<Future<List<ActiveCompaction>>> futures = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 129cb6a681..c3b51237b6 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -30,6 +30,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL; import java.io.BufferedReader; import java.io.FileNotFoundException; @@ -496,7 +497,7 @@ public class TableOperationsImpl extends TableOperationsHelper { AtomicReference<Exception> exception = new AtomicReference<>(null); ExecutorService executor = - context.threadPools().getPoolBuilder("addSplits").numCoreThreads(16).build(); + context.threadPools().getPoolBuilder(SPLIT_POOL).numCoreThreads(16).build(); try { executor.execute( new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java index 23f40e9be3..8b149da57f 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.clientImpl; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; import java.lang.ref.Cleaner.Cleanable; import java.util.ArrayList; @@ -71,9 +72,10 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan this.tableName = tableName; this.numThreads = numQueryThreads; - queryThreadPool = - context.threadPools().getPoolBuilder("batch scanner " + batchReaderInstance + "-") - .numCoreThreads(numQueryThreads).build(); + queryThreadPool = context.threadPools() + .getPoolBuilder( + ACCUMULO_POOL_PREFIX.poolName + ".client.batch.scanner." + batchReaderInstance) + .numCoreThreads(numQueryThreads).build(); // Call shutdown on this thread pool in case the caller does not call close(). cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool, closed, log); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index 980ba0408a..a20e3ba9c5 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -24,6 +24,8 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BATCH_WRITER_BIN_MUTATIONS_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BATCH_WRITER_SEND_POOL; import java.io.IOException; import java.lang.management.CompilationMXBean; @@ -672,11 +674,11 @@ public class TabletServerBatchWriter implements AutoCloseable { public MutationWriter(int numSendThreads) { serversMutations = new HashMap<>(); queued = new HashSet<>(); - sendThreadPool = context.threadPools().getPoolBuilder(this.getClass().getName()) + sendThreadPool = context.threadPools().getPoolBuilder(BATCH_WRITER_SEND_POOL) .numCoreThreads(numSendThreads).build(); locators = new HashMap<>(); - binningThreadPool = context.threadPools().getPoolBuilder("BinMutations").numCoreThreads(1) - .withQueue(new SynchronousQueue<>()).build(); + binningThreadPool = context.threadPools().getPoolBuilder(BATCH_WRITER_BIN_MUTATIONS_POOL) + .numCoreThreads(1).withQueue(new SynchronousQueue<>()).build(); binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index f13420d006..a85db74a86 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -24,6 +24,8 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.groupingBy; import static org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.pathToCacheId; import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BULK_IMPORT_CLIENT_BULK_THREADS_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BULK_IMPORT_CLIENT_LOAD_POOL; import java.io.FileNotFoundException; import java.io.IOException; @@ -482,12 +484,14 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti if (this.executor != null) { executor = this.executor; } else if (numThreads > 0) { - executor = service = context.threadPools().getPoolBuilder("BulkImportThread") - .numCoreThreads(numThreads).build(); + executor = service = context.threadPools().getPoolBuilder(BULK_IMPORT_CLIENT_LOAD_POOL) + .numCoreThreads(numThreads).enableThreadPoolMetrics().build(); } else { String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey()); - executor = service = context.threadPools().getPoolBuilder("BulkImportThread") - .numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads)).build(); + executor = + service = context.threadPools().getPoolBuilder(BULK_IMPORT_CLIENT_BULK_THREADS_POOL) + .numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads)) + .enableThreadPoolMetrics().build(); } try { diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java index 1620f809b0..0801371d48 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java +++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.file; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BLOOM_LOADER_POOL; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -80,7 +81,7 @@ public class BloomFilterLayer { } if (maxLoadThreads > 0) { - loadThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("bloom-loader") + loadThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder(BLOOM_LOADER_POOL) .numCoreThreads(0).numMaxThreads(maxLoadThreads).withTimeOut(60L, SECONDS).build(); } return loadThreadPool; diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index c2e5f81892..b3e4a99a7c 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -19,6 +19,8 @@ package org.apache.accumulo.core.util.compaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTIONS_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTION_IDS_POOL; import java.util.ArrayList; import java.util.Collection; @@ -224,7 +226,7 @@ public class ExternalCompactionUtil { public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientContext context) { final List<RunningCompactionFuture> rcFutures = new ArrayList<>(); final ExecutorService executor = ThreadPools.getServerThreadPools() - .getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build(); + .getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build(); getCompactorAddrs(context).forEach((q, hp) -> { hp.forEach(hostAndPort -> { rcFutures.add(new RunningCompactionFuture(q, hostAndPort, @@ -251,7 +253,7 @@ public class ExternalCompactionUtil { public static Collection<ExternalCompactionId> getCompactionIdsRunningOnCompactors(ClientContext context) { final ExecutorService executor = ThreadPools.getServerThreadPools() - .getPoolBuilder("CompactorRunningCompactions").numCoreThreads(16).build(); + .getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build(); List<Future<ExternalCompactionId>> futures = new ArrayList<>(); getCompactorAddrs(context).forEach((q, hp) -> { diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java new file mode 100644 index 0000000000..bdebd03b2d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.threads; + +public enum ThreadPoolNames { + + ACCUMULO_POOL_PREFIX("accumulo.pool"), + BATCH_WRITER_SEND_POOL("accumulo.pool.batch.writer.send"), + BATCH_WRITER_BIN_MUTATIONS_POOL("accumulo.pool.batch.writer.bin.mutations"), + BLOOM_LOADER_POOL("accumulo.pool.bloom.loader"), + BULK_IMPORT_CLIENT_LOAD_POOL("accumulo.pool.bulk.import.client.bulk.load"), + BULK_IMPORT_CLIENT_BULK_THREADS_POOL("accumulo.pool.bulk.import.client.bulk.threads"), + BULK_IMPORT_DIR_MOVE_POOL("accumulo.pool.bulk.dir.move"), + COMPACTION_COORDINATOR_SUMMARY_POOL("accumulo.pool.compaction.summary.gatherer"), + COMPACTION_SERVICE_COMPACTION_PLANNER_POOL("accumulo.pool.compaction.service.compaction.planner"), + COMPACTOR_RUNNING_COMPACTIONS_POOL("accumulo.pool.compactor.running.compactions"), + COMPACTOR_RUNNING_COMPACTION_IDS_POOL("accumulo.pool.compactor.running.compaction.ids"), + CONDITIONAL_WRITER_POOL("accumulo.pool.conditional.writer"), + CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"), + COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"), + COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"), + GC_DELETE_POOL("accumulo.pool.gc.threads.delete"), + GENERAL_SERVER_POOL("accumulo.pool.general.server"), + GENERAL_SERVER_SIMPLETIMER_POOL("accumulo.pool.general.server.simpletimer"), + IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"), + INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"), + MANAGER_BULK_IMPORT_POOL("accumulo.pool.manager.bulk.import"), + MANAGER_FATE_POOL("accumulo.pool.manager.fate"), + MANAGER_RENAME_POOL("accumulo.pool.manager.rename"), + MANAGER_STATUS_POOL("accumulo.pool.manager.status"), + MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"), + METADATA_DEFAULT_SPLIT_POOL("accumulo.pool.metadata.tablet.default.splitter"), + METADATA_TABLET_MIGRATION_POOL("accumulo.pool.metadata.tablet.migration"), + METADATA_TABLET_ASSIGNMENT_POOL("accumulo.pool.metadata.tablet.assignment"), + REPLICATION_WORKER_POOL("accumulo.pool.replication.worker"), + SCAN_POOL("accumulo.pool.scan"), + SCAN_SERVER_TABLET_METADATA_CACHE_POOL("accumulo.pool.scan.server.tablet.metadata.cache"), + SCANNER_READ_AHEAD_POOL("accumulo.pool.client.context.scanner.read.ahead"), + SCHED_FUTURE_CHECKER_POOL("accumulo.pool.scheduled.future.checker"), + SPLIT_POOL("accumulo.pool.table.ops.add.splits"), + TABLET_ASSIGNMENT_POOL("accumulo.pool.tablet.assignment.pool"), + TSERVER_ASSIGNMENT_POOL("accumulo.pool.tserver.assignment"), + TSERVER_COMPACTION_MINOR_POOL("accumulo.pool.tserver.compaction.minor"), + TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"), + TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"), + TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"), + TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"), + TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"), + TSERVER_SUMMARY_RETRIEVAL_POOL("accumulo.pool.tserver.summary.retrieval"), + TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"), + TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"), + TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"), + TSERVER_WORKQ_POOL("accumulo.pool.tserver.workq"), + UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"), + UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers"); + + public final String poolName; + + ThreadPoolNames(String poolName) { + this.poolName = poolName; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index ea26c563e4..b2b0bc02db 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -21,6 +21,23 @@ package org.apache.accumulo.core.util.threads; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_RENAME_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.REPLICATION_WORKER_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_RETRIEVAL_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WORKQ_POOL; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Iterator; @@ -83,7 +100,7 @@ public class ThreadPools { } private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL = - getServerThreadPools().getPoolBuilder("Scheduled Future Checker").numCoreThreads(1).build(); + getServerThreadPools().getPoolBuilder(SCHED_FUTURE_CHECKER_POOL).numCoreThreads(1).build(); private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS = new ConcurrentLinkedQueue<>(); @@ -272,31 +289,33 @@ public class ThreadPools { ThreadPoolExecutorBuilder builder; switch (p) { case GENERAL_SIMPLETIMER_THREADPOOL_SIZE: - return createScheduledExecutorService(conf.getCount(p), "SimpleTimer"); + return createScheduledExecutorService(conf.getCount(p), + GENERAL_SERVER_SIMPLETIMER_POOL.poolName); case GENERAL_THREADPOOL_SIZE: - return createScheduledExecutorService(conf.getCount(p), "GeneralExecutor", + return createScheduledExecutorService(conf.getCount(p), GENERAL_SERVER_POOL.poolName, emitThreadPoolMetrics); case MANAGER_BULK_THREADPOOL_SIZE: - builder = getPoolBuilder("bulk import").numCoreThreads(conf.getCount(p)).withTimeOut( - conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS); + builder = + getPoolBuilder(MANAGER_BULK_IMPORT_POOL).numCoreThreads(conf.getCount(p)).withTimeOut( + conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case MANAGER_RENAME_THREADS: - builder = getPoolBuilder("bulk move").numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(MANAGER_RENAME_POOL).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case MANAGER_FATE_THREADPOOL_SIZE: - builder = getPoolBuilder("Repo Runner").numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(MANAGER_FATE_POOL).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case MANAGER_STATUS_THREAD_POOL_SIZE: - builder = getPoolBuilder("GatherTableInformation"); + builder = getPoolBuilder(MANAGER_STATUS_POOL); int threads = conf.getCount(p); if (threads == 0) { builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) @@ -309,57 +328,57 @@ public class ThreadPools { } return builder.build(); case TSERV_WORKQ_THREADS: - builder = getPoolBuilder("distributed work queue").numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(TSERVER_WORKQ_POOL).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_MINC_MAXCONCURRENT: - builder = getPoolBuilder("minor compactor").numCoreThreads(conf.getCount(p)).withTimeOut(0L, - MILLISECONDS); + builder = getPoolBuilder(TSERVER_MINOR_COMPACTOR_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_MIGRATE_MAXCONCURRENT: - builder = getPoolBuilder("tablet migration").numCoreThreads(conf.getCount(p)) + builder = getPoolBuilder(TSERVER_MIGRATIONS_POOL).numCoreThreads(conf.getCount(p)) .withTimeOut(0L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_ASSIGNMENT_MAXCONCURRENT: - builder = getPoolBuilder("tablet assignment").numCoreThreads(conf.getCount(p)) + builder = getPoolBuilder(TSERVER_ASSIGNMENT_POOL).numCoreThreads(conf.getCount(p)) .withTimeOut(0L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_SUMMARY_RETRIEVAL_THREADS: - builder = getPoolBuilder("summary file retriever").numCoreThreads(conf.getCount(p)) + builder = getPoolBuilder(TSERVER_SUMMARY_RETRIEVAL_POOL).numCoreThreads(conf.getCount(p)) .withTimeOut(60L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_SUMMARY_REMOTE_THREADS: - builder = getPoolBuilder("summary remote").numCoreThreads(conf.getCount(p)).withTimeOut(60L, - MILLISECONDS); + builder = getPoolBuilder(TSERVER_SUMMARY_REMOTE_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_SUMMARY_PARTITION_THREADS: - builder = getPoolBuilder("summary partition").numCoreThreads(conf.getCount(p)) + builder = getPoolBuilder(TSERVER_SUMMARY_PARTITION_POOL).numCoreThreads(conf.getCount(p)) .withTimeOut(60L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case GC_DELETE_THREADS: - return getPoolBuilder("deleting").numCoreThreads(conf.getCount(p)).build(); + return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); case REPLICATION_WORKER_THREADS: - builder = getPoolBuilder("replication task").numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(REPLICATION_WORKER_POOL).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } @@ -370,8 +389,28 @@ public class ThreadPools { } } + /** + * Fet a fluent-style pool builder. + * + * @param pool the constant pool name + */ + public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final ThreadPoolNames pool) { + return new ThreadPoolExecutorBuilder(pool.poolName); + } + + /** + * Fet a fluent-style pool builder. + * + * @param name the pool name - the name trimed and prepended with the ACCUMULO_POOL_PREFIX so that + * pool names begin with a consistent prefix. + */ public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final String name) { - return new ThreadPoolExecutorBuilder(name); + String trimmed = name.trim(); + if (trimmed.startsWith(ACCUMULO_POOL_PREFIX.poolName)) { + return new ThreadPoolExecutorBuilder(trimmed); + } else { + return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX.poolName + trimmed); + } } public class ThreadPoolExecutorBuilder { diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index ea0c4ceabe..331a050b60 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -231,7 +231,7 @@ public class MultiThreadedRFileTest { // now start up multiple RFile deepcopies int maxThreads = 10; - String name = "MultiThreadedRFileTestThread"; + String name = "test.rfile.multi.thread.pool"; ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().getPoolBuilder(name).numCoreThreads(maxThreads + 1) .numMaxThreads(maxThreads + 1).withTimeOut(5, MINUTES).build(); diff --git a/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java b/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java index 5146ccf5b2..1d6fae41cc 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/threads/ThreadPoolExecutorBuilderTest.java @@ -31,7 +31,7 @@ public class ThreadPoolExecutorBuilderTest { @Test public void builderDefaultsTest() { - var p = serverPool.getPoolBuilder("defaults").build(); + var p = serverPool.getPoolBuilder("defaults.pool").build(); assertEquals(0, p.getCorePoolSize()); assertEquals(1, p.getMaximumPoolSize()); assertEquals(3L, p.getKeepAliveTime(MINUTES)); @@ -40,38 +40,38 @@ public class ThreadPoolExecutorBuilderTest { @Test public void builderInvalidNumCoreTest() { assertThrows(IllegalArgumentException.class, - () -> serverPool.getPoolBuilder("test1").numCoreThreads(-1).build()); + () -> serverPool.getPoolBuilder("test1.pool").numCoreThreads(-1).build()); } @Test public void builderInvalidNumMaxThreadsTest() { // max threads must be > core threads assertThrows(IllegalArgumentException.class, - () -> serverPool.getPoolBuilder("test1").numCoreThreads(2).numMaxThreads(1).build()); + () -> serverPool.getPoolBuilder("test1.pool").numCoreThreads(2).numMaxThreads(1).build()); } @Test public void builderPoolCoreMaxTest() { - var p = serverPool.getPoolBuilder("test1").numCoreThreads(1).numMaxThreads(2).build(); + var p = serverPool.getPoolBuilder("test1.pool").numCoreThreads(1).numMaxThreads(2).build(); assertEquals(1, p.getCorePoolSize()); assertEquals(2, p.getMaximumPoolSize()); } @Test public void builderFixedPoolTest() { - var p = serverPool.getPoolBuilder("test1").numCoreThreads(2).build(); + var p = serverPool.getPoolBuilder("test1.pool").numCoreThreads(2).build(); assertEquals(2, p.getCorePoolSize()); assertEquals(2, p.getMaximumPoolSize()); } @Test public void buildeSetTimeoutTest() { - var p = serverPool.getPoolBuilder("test1").withTimeOut(0L, MILLISECONDS).build(); + var p = serverPool.getPoolBuilder("test1.pool").withTimeOut(0L, MILLISECONDS).build(); assertEquals(0, p.getCorePoolSize()); assertEquals(1, p.getMaximumPoolSize()); assertEquals(0L, p.getKeepAliveTime(MINUTES)); - var p2 = serverPool.getPoolBuilder("test1").withTimeOut(123L, MILLISECONDS).build(); + var p2 = serverPool.getPoolBuilder("test1.pool").withTimeOut(123L, MILLISECONDS).build(); assertEquals(0, p2.getCorePoolSize()); assertEquals(1, p2.getMaximumPoolSize()); assertEquals(123L, p2.getKeepAliveTime(MILLISECONDS)); diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index 4de81d7759..679941fafd 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -134,8 +134,9 @@ public class BulkImporter { Collections.synchronizedSortedMap(new TreeMap<>()); timer.start(Timers.EXAMINE_MAP_FILES); - ExecutorService threadPool = ThreadPools.getServerThreadPools() - .getPoolBuilder("findOverlapping").numCoreThreads(numThreads).build(); + ExecutorService threadPool = + ThreadPools.getServerThreadPools().getPoolBuilder("bulk.import.find.overlapping") + .numCoreThreads(numThreads).enableThreadPoolMetrics().build(); for (Path path : paths) { final Path mapFile = path; @@ -362,8 +363,8 @@ public class BulkImporter { final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<>()); - ExecutorService threadPool = ThreadPools.getServerThreadPools().getPoolBuilder("estimateSizes") - .numCoreThreads(numThreads).build(); + ExecutorService threadPool = ThreadPools.getServerThreadPools() + .getPoolBuilder("bulk.import.size.estimate").numCoreThreads(numThreads).build(); for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) { if (entry.getValue().size() == 1) { @@ -552,8 +553,8 @@ public class BulkImporter { } }); - ExecutorService threadPool = ThreadPools.getServerThreadPools().getPoolBuilder("submit") - .numCoreThreads(numThreads).build(); + ExecutorService threadPool = ThreadPools.getServerThreadPools() + .getPoolBuilder("bulk.import.submit").numCoreThreads(numThreads).build(); for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer .entrySet()) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java index d55c9465b1..c50f896b99 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java @@ -45,8 +45,8 @@ public class PropCacheCaffeineImpl implements PropCache { public static final int EXPIRE_MIN = 60; private static final Logger log = LoggerFactory.getLogger(PropCacheCaffeineImpl.class); private static final Executor executor = - ThreadPools.getServerThreadPools().getPoolBuilder("caffeine-tasks").numCoreThreads(1) - .numMaxThreads(20).withTimeOut(60L, SECONDS).build(); + ThreadPools.getServerThreadPools().getPoolBuilder("caffeine.prop.cache.tasks") + .numCoreThreads(1).numMaxThreads(20).withTimeOut(60L, SECONDS).build(); private final LoadingCache<PropStoreKey<?>,VersionedProperties> cache; diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java index 952409a2bb..d4fdf455f6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java @@ -58,7 +58,7 @@ public class PropStoreWatcher implements Watcher { private static final Logger log = LoggerFactory.getLogger(PropStoreWatcher.class); private static final ExecutorService executorService = ThreadPools.getServerThreadPools() - .getPoolBuilder("zoo_change_update").numCoreThreads(2).build(); + .getPoolBuilder("prop.store.zoo.change.update").numCoreThreads(2).build(); private final ReentrantReadWriteLock listenerLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock listenerReadLock = listenerLock.readLock(); private final ReentrantReadWriteLock.WriteLock listenerWriteLock = listenerLock.writeLock(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java index 2c04fc9e49..6f8687fd64 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java +++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java @@ -68,7 +68,7 @@ public class ProblemReports implements Iterable<ProblemReport> { * is reporting lots of problems, but problem reports can not be processed */ private final ExecutorService reportExecutor = ThreadPools.getServerThreadPools() - .getPoolBuilder("acu-problem-reporter").numCoreThreads(0).numMaxThreads(1) + .getPoolBuilder("problem.reporter").numCoreThreads(0).numMaxThreads(1) .withTimeOut(60L, SECONDS).withQueue(new LinkedBlockingQueue<>(500)).build(); private final ServerContext context; diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 87bfb4c0c8..d4220ff6de 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -20,6 +20,7 @@ package org.apache.accumulo.server.rpc; import static com.google.common.base.Preconditions.checkArgument; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; import java.io.IOException; import java.net.InetAddress; @@ -310,20 +311,21 @@ public class TServerUtils { private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) { - final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools() - .getPoolBuilder(serverName + "-ClientPool").numCoreThreads(executorThreads) - .withTimeOut(threadTimeOut, MILLISECONDS).enableThreadPoolMetrics().build(); + String poolName = ACCUMULO_POOL_PREFIX.poolName + serverName.toLowerCase() + ".client"; + final ThreadPoolExecutor pool = + ThreadPools.getServerThreadPools().getPoolBuilder(poolName).numCoreThreads(executorThreads) + .withTimeOut(threadTimeOut, MILLISECONDS).enableThreadPoolMetrics().build(); // periodically adjust the number of threads we need by checking how busy our threads are ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> { // there is a minor race condition between sampling the current state of the thread pool // and adjusting it however, this isn't really an issue, since it adjusts periodically if (pool.getCorePoolSize() <= pool.getActiveCount()) { int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2); - ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool"); + ThreadPools.resizePool(pool, () -> larger, poolName); } else { if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1); - ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool"); + ThreadPools.resizePool(pool, () -> smaller, poolName); } } }); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java index 18de10604c..f9c05cdba8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.util; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.UTILITY_CHECK_FILE_TASKS; + import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -121,8 +123,8 @@ public class RemoveEntriesForMissingFiles { Map<Path,Path> cache = new LRUMap<>(100000); Set<Path> processing = new HashSet<>(); - ExecutorService threadPool = ThreadPools.getServerThreadPools().getPoolBuilder("CheckFileTasks") - .numCoreThreads(16).build(); + ExecutorService threadPool = ThreadPools.getServerThreadPools() + .getPoolBuilder(UTILITY_CHECK_FILE_TASKS).numCoreThreads(16).build(); System.out.printf("Scanning : %s %s\n", tableName, range); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java index f1201830a3..e14092116b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.util; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.UTILITY_VERIFY_TABLET_ASSIGNMENTS; + import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -117,8 +119,8 @@ public class VerifyTabletAssignments { } } - ExecutorService tp = ThreadPools.getServerThreadPools().getPoolBuilder("CheckTabletServer") - .numCoreThreads(20).build(); + ExecutorService tp = ThreadPools.getServerThreadPools() + .getPoolBuilder(UTILITY_VERIFY_TABLET_ASSIGNMENTS).numCoreThreads(20).build(); for (final Entry<HostAndPort,List<KeyExtent>> entry : extentsPerServer.entrySet()) { Runnable r = () -> { diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java index e52b39a419..25a94c839c 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ReadyMonitorTest.java @@ -60,7 +60,7 @@ public class ReadyMonitorTest { // these tests wait for workers to signal ready using count down latch. // size pool so some threads are likely to wait on others to complete. int numPoolThreads = numWorkerThreads / 2; - workerPool = ThreadPools.getServerThreadPools().getPoolBuilder("readyMonitor-test-pool") + workerPool = ThreadPools.getServerThreadPools().getPoolBuilder("test.ready.monitor.pool") .numCoreThreads(numPoolThreads).build(); } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 16452b0805..9f03235eae 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -20,6 +20,7 @@ package org.apache.accumulo.coordinator; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTION_COORDINATOR_SUMMARY_POOL; import java.net.UnknownHostException; import java.util.HashSet; @@ -352,7 +353,7 @@ public class CompactionCoordinator extends AbstractServer private void updateSummaries() { ExecutorService executor = ThreadPools.getServerThreadPools() - .getPoolBuilder("Compaction Summary Gatherer").numCoreThreads(10).build(); + .getPoolBuilder(COMPACTION_COORDINATOR_SUMMARY_POOL).numCoreThreads(10).build(); try { Set<String> queuesSeen = new ConcurrentSkipListSet<>(); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index 45b6161bab..a341a58582 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -22,6 +22,8 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_FINALIZER_BACKGROUND_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_FINALIZER_NOTIFIER_POOL; import java.util.ArrayList; import java.util.Iterator; @@ -76,11 +78,11 @@ public class CompactionFinalizer { .getCount(Property.COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS); this.ntfyExecutor = ThreadPools.getServerThreadPools() - .getPoolBuilder("Compaction Finalizer Notifier").numCoreThreads(3).numMaxThreads(max) + .getPoolBuilder(COORDINATOR_FINALIZER_NOTIFIER_POOL).numCoreThreads(3).numMaxThreads(max) .withTimeOut(1L, MINUTES).enableThreadPoolMetrics().build(); this.backgroundExecutor = - ThreadPools.getServerThreadPools().getPoolBuilder("Compaction Finalizer Background Task") + ThreadPools.getServerThreadPools().getPoolBuilder(COORDINATOR_FINALIZER_BACKGROUND_POOL) .numCoreThreads(1).enableThreadPoolMetrics().build(); backgroundExecutor.execute(() -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java index 5ace9ccb6f..72e4923f6d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.manager.tableOps.bulkVer2; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BULK_IMPORT_DIR_MOVE_POOL; + import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -117,7 +119,7 @@ class BulkImportMove extends ManagerRepo { oldToNewMap.put(originalPath, newPath); } try { - fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fmtTid); + fs.bulkRename(oldToNewMap, workerCount, BULK_IMPORT_DIR_MOVE_POOL.poolName, fmtTid); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java index c717527619..1bc30fc73c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.manager.tableOps.tableImport; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.IMPORT_TABLE_RENAME_POOL; + import java.io.IOException; import java.util.Arrays; import java.util.HashMap; @@ -112,7 +114,7 @@ class MoveExportedFiles extends ManagerRepo { } } try { - fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fmtTid); + fs.bulkRename(oldToNewPaths, workerCount, IMPORT_TABLE_RENAME_POOL.poolName, fmtTid); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null, TableOperation.IMPORT, TableOperationExceptionType.OTHER, ioe.getCause().getMessage()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 11cb713b58..5445d6d3da 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager.upgrade; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_UPGRADE_COORDINATOR_METADATA_POOL; import java.io.IOException; import java.util.Collections; @@ -194,8 +195,9 @@ public class UpgradeCoordinator { "Not currently in a suitable state to do metadata upgrade %s", status); if (currentVersion < AccumuloDataVersion.get()) { - return ThreadPools.getServerThreadPools().getPoolBuilder("UpgradeMetadataThreads") - .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) + return ThreadPools.getServerThreadPools() + .getPoolBuilder(MANAGER_UPGRADE_COORDINATOR_METADATA_POOL).numCoreThreads(0) + .numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) .withQueue(new SynchronousQueue<>()).build().submit(() -> { try { for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index ad9c72d0c0..18cb23b37b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -20,6 +20,7 @@ package org.apache.accumulo.tserver; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_SERVER_TABLET_METADATA_CACHE_POOL; import java.io.IOException; import java.io.UncheckedIOException; @@ -259,8 +260,8 @@ public class ScanServer extends AbstractServer "Tablet metadata cache refresh percentage is '%s' but must be less than 1", cacheRefreshPercentage); - tmCacheExecutor = context.threadPools().getPoolBuilder("scanServerTmCache").numCoreThreads(8) - .enableThreadPoolMetrics().build(); + tmCacheExecutor = context.threadPools().getPoolBuilder(SCAN_SERVER_TABLET_METADATA_CACHE_POOL) + .numCoreThreads(8).enableThreadPoolMetrics().build(); var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS) .scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats(); if (cacheRefreshPercentage > 0) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index c3ad1fd830..655b54fdbd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -23,6 +23,17 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toUnmodifiableMap; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_DEFAULT_SPLIT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL; import java.io.IOException; import java.util.ArrayList; @@ -130,13 +141,13 @@ public class TabletServerResourceManager { * pool executor * * @param maxThreads max threads - * @param name name of thread pool + * @param pool name of thread pool * @param tp executor */ - private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name, + private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String pool, final ThreadPoolExecutor tp) { ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( - () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, SECONDS)); + () -> ThreadPools.resizePool(tp, maxThreads, pool), 1, 10, SECONDS)); } private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec, @@ -184,12 +195,14 @@ public class TabletServerResourceManager { } scanExecQueues.put(sec.name, queue); - - ThreadPoolExecutor es = ThreadPools.getServerThreadPools().getPoolBuilder("scan-" + sec.name) + ThreadPoolExecutor es = ThreadPools.getServerThreadPools() + .getPoolBuilder(ACCUMULO_POOL_PREFIX.poolName + ".scan." + sec.name) .numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads()) .withTimeOut(0L, MILLISECONDS).withQueue(queue).atPriority(sec.priority) .enableThreadPoolMetrics(enableMetrics).build(); - modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + sec.name, es); + + modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, + ACCUMULO_POOL_PREFIX.poolName + ".scan." + sec.name, es); return es; } @@ -306,25 +319,24 @@ public class TabletServerResourceManager { Property.TSERV_MINC_MAXCONCURRENT, true); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT), - "minor compactor", minorCompactionThreadPool); + TSERVER_MINOR_COMPACTOR_POOL.poolName, minorCompactionThreadPool); - splitThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("splitter") - .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS) - .enableThreadPoolMetrics(enableMetrics).build(); + splitThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder(SPLIT_POOL) + .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS).build(); - defaultSplitThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("md splitter") - .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS) - .enableThreadPoolMetrics(enableMetrics).build(); + defaultSplitThreadPool = + ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_DEFAULT_SPLIT_POOL) + .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build(); - defaultMigrationPool = ThreadPools.getServerThreadPools() - .getPoolBuilder("metadata tablet migration").numCoreThreads(0).numMaxThreads(1) - .withTimeOut(60, SECONDS).enableThreadPoolMetrics(enableMetrics).build(); + defaultMigrationPool = + ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_MIGRATION_POOL) + .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build(); migrationPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT), - "tablet migration", migrationPool); + TSERVER_TABLET_MIGRATION_POOL.poolName, migrationPool); // not sure if concurrent assignments can run safely... even if they could there is probably no // benefit at startup because @@ -335,11 +347,11 @@ public class TabletServerResourceManager { Property.TSERV_ASSIGNMENT_MAXCONCURRENT, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT), - "tablet assignment", assignmentPool); + TABLET_ASSIGNMENT_POOL.poolName, assignmentPool); - assignMetaDataPool = ThreadPools.getServerThreadPools() - .getPoolBuilder("metadata tablet assignment").numCoreThreads(0).numMaxThreads(1) - .withTimeOut(60, SECONDS).enableThreadPoolMetrics(enableMetrics).build(); + assignMetaDataPool = + ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_ASSIGNMENT_POOL) + .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build(); activeAssignments = new ConcurrentHashMap<>(); @@ -347,19 +359,19 @@ public class TabletServerResourceManager { Property.TSERV_SUMMARY_RETRIEVAL_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS), - "summary file retriever", summaryRetrievalPool); + TSERVER_SUMMARY_FILE_RETRIEVER_POOL.poolName, summaryRetrievalPool); summaryRemotePool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_SUMMARY_REMOTE_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS), - "summary remote", summaryRemotePool); + TSERVER_SUMMARY_REMOTE_POOL.poolName, summaryRemotePool); summaryPartitionPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_SUMMARY_PARTITION_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS), - "summary partition", summaryPartitionPool); + TSERVER_SUMMARY_PARTITION_POOL.poolName, summaryPartitionPool); boolean isScanServer = (tserver instanceof ScanServer); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index 3054db17ea..85df30494a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.compactions; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTION_SERVICE_COMPACTION_PLANNER_POOL; import java.util.Collection; import java.util.Collections; @@ -132,8 +133,9 @@ public class CompactionService { this.executors = Map.copyOf(tmpExecutors); - this.planningExecutor = ThreadPools.getServerThreadPools().getPoolBuilder("CompactionPlanner") - .numCoreThreads(1).numMaxThreads(1).withTimeOut(0L, MILLISECONDS).build(); + this.planningExecutor = ThreadPools.getServerThreadPools() + .getPoolBuilder(COMPACTION_SERVICE_COMPACTION_PLANNER_POOL).numCoreThreads(1) + .numMaxThreads(1).withTimeOut(0L, MILLISECONDS).build(); this.queuedForPlanning = new EnumMap<>(CompactionKind.class); for (CompactionKind kind : CompactionKind.values()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index 229f53ca77..d059de7c30 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.compactions; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; import java.util.ArrayList; import java.util.Collections; @@ -173,7 +174,9 @@ public class InternalCompactionExecutor implements CompactionExecutor { queue = new PriorityBlockingQueue<>(100, comparator); - threadPool = ThreadPools.getServerThreadPools().getPoolBuilder("compaction." + ceid) + threadPool = ThreadPools.getServerThreadPools() + .getPoolBuilder( + ACCUMULO_POOL_PREFIX.poolName + ".compaction.service.internal.compaction." + ceid) .numCoreThreads(threads).numMaxThreads(threads).withTimeOut(60L, SECONDS).withQueue(queue) .build(); metricCloser = @@ -204,7 +207,8 @@ public class InternalCompactionExecutor implements CompactionExecutor { } public void setThreads(int numThreads) { - ThreadPools.resizePool(threadPool, () -> numThreads, "compaction." + ceid); + ThreadPools.resizePool(threadPool, () -> numThreads, + ACCUMULO_POOL_PREFIX.poolName + "accumulo.pool.compaction." + ceid); } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 6fc396e4f4..32a249e7b6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.log; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WAL_SORT_CONCURRENT_POOL; import java.io.DataInputStream; import java.io.EOFException; @@ -297,7 +298,7 @@ public class LogSorter { int threadPoolSize = this.conf.getCount(this.conf .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, Property.TSERV_RECOVERY_MAX_CONCURRENT)); ThreadPoolExecutor threadPool = - ThreadPools.getServerThreadPools().getPoolBuilder(this.getClass().getName()) + ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL) .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build(); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context).startProcessing(new LogProcessor(), threadPool); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 346b70166c..6757d276ee 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.log; import static java.util.Collections.singletonList; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WAL_CREATOR_POOL; import java.io.IOException; import java.nio.channels.ClosedChannelException; @@ -262,8 +263,8 @@ public class TabletServerLogger { if (nextLogMaker != null) { return; } - nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder("WALog creator") - .numCoreThreads(1).enableThreadPoolMetrics().build(); + nextLogMaker = ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_CREATOR_POOL) + .numCoreThreads(1).build(); nextLogMaker.execute(new Runnable() { @Override public void run() { diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java index 40c1487e58..c51cd0c5d5 100644 --- a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java @@ -78,8 +78,8 @@ public class BalanceWithOfflineTableIT extends ConfigurableMacBase { log.info("Waiting for balance"); - ExecutorService pool = ThreadPools.getServerThreadPools().getPoolBuilder("waitForBalance") - .numCoreThreads(1).build(); + ExecutorService pool = ThreadPools.getServerThreadPools() + .getPoolBuilder("test.wait.for.balance.pool").numCoreThreads(1).build(); Future<Boolean> wait = pool.submit(() -> { c.instanceOperations().waitForBalance(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java index 101c9fc65e..9ed38c5503 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java @@ -213,7 +213,7 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness { } ThreadPoolExecutor threads = ThreadPools.getServerThreadPools() - .getPoolBuilder("ClientThreads").numCoreThreads(NUM_THREADS).build(); + .getPoolBuilder("batch.writer.client.flush").numCoreThreads(NUM_THREADS).build(); threads.allowCoreThreadTimeOut(false); threads.prestartAllCoreThreads();