This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new d56c817421 IGNITE-21486 Pass node name to NamedThreadFactory where possible (#3181) d56c817421 is described below commit d56c8174219b0579fd0d8904928b207e8bb1475d Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Feb 8 10:01:15 2024 +0400 IGNITE-21486 Pass node name to NamedThreadFactory where possible (#3181) --- .../apache/ignite/internal/catalog/ClockWaiter.java | 4 ++-- .../management/ClusterManagementGroupManager.java | 7 +++++-- .../internal/deployunit/DeploymentManagerImpl.java | 2 +- .../internal/deployunit/FileDeployerService.java | 11 +++++++++-- .../metastore/NodeStatusWatchListener.java | 7 +++++-- .../ignite/deployment/FileDeployerServiceTest.java | 2 +- .../compute/loader/JobContextManagerTest.java | 2 +- ...IgniteDistributionZoneManagerNodeRestartTest.java | 2 +- .../internal/raft/server/impl/JraftServerImpl.java | 4 ++-- .../raft/storage/impl/DefaultLogStorageFactory.java | 13 ++++++++++++- .../impl/VolatileLogStorageFactoryCreator.java | 4 ++-- .../raft/storage/logit/LogitLogStorageFactory.java | 4 ++-- .../raft/jraft/storage/impl/LogStorageBenchmark.java | 2 +- .../jraft/storage/logit/LogitLogStorageTest.java | 2 +- .../configuration/generator/DefaultsGenerator.java | 2 +- .../ItDistributedConfigurationPropertiesTest.java | 2 +- .../ItDistributedConfigurationStorageTest.java | 2 +- .../internal/runner/app/ItIgniteNodeRestartTest.java | 2 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 7 ++++--- .../storage/DistributedConfigurationStorage.java | 6 ++++-- .../storage/LocalFileConfigurationStorage.java | 20 +++++++++++++++++--- .../storage/DistributedConfigurationStorageTest.java | 2 +- .../sql/engine/prepare/PrepareServiceImpl.java | 2 +- .../rebalance/ItRebalanceDistributedTest.java | 2 +- .../ignite/internal/table/distributed/gc/MvGc.java | 3 +-- .../snapshot/outgoing/OutgoingSnapshotsManager.java | 16 +++++++++++++++- 26 files changed, 93 insertions(+), 39 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java index fbc7f8ec47..25bdf80bb4 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java @@ -88,7 +88,7 @@ public class ClockWaiter implements IgniteComponent { 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), - new NamedThreadFactory(nodeName + "-clock-waiter-future-executor", LOG) + NamedThreadFactory.create(nodeName, "clock-waiter-future-executor", LOG) ); } @@ -96,7 +96,7 @@ public class ClockWaiter implements IgniteComponent { public CompletableFuture<Void> start() { clock.addUpdateListener(updateListener); - scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter-scheduler", LOG)); + scheduler = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName, "clock-waiter-scheduler", LOG)); return nullCompletedFuture(); } diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index 424c0fc578..372ce984fd 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -108,8 +108,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { private final CmgMessagesFactory msgFactory = new CmgMessagesFactory(); /** Delayed executor. */ - private final ScheduledExecutorService scheduledExecutor = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("cmg-manager", LOG)); + private final ScheduledExecutorService scheduledExecutor; private final ClusterService clusterService; @@ -152,6 +151,10 @@ public class ClusterManagementGroupManager implements IgniteComponent { this.configuration = configuration; this.localStateStorage = new LocalStateStorage(vault); this.nodeAttributes = nodeAttributes; + + scheduledExecutor = Executors.newSingleThreadScheduledExecutor( + NamedThreadFactory.create(clusterService.nodeName(), "cmg-manager", LOG) + ); } /** diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java index c7d0599f4e..bc899fdf83 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java @@ -152,7 +152,7 @@ public class DeploymentManagerImpl implements IgniteDeployment { this.workDir = workDir; this.nodeName = nodeName; tracker = new DownloadTracker(); - deployer = new FileDeployerService(); + deployer = new FileDeployerService(nodeName); deploymentUnitAccessor = new DeploymentUnitAccessorImpl(deployer); undeployer = new DeploymentUnitAcquiredWaiter( nodeName, diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java index f2b74ec7c0..93e241b2bd 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java @@ -56,8 +56,15 @@ public class FileDeployerService { private Path unitsFolder; - private final ExecutorService executor = Executors.newFixedThreadPool( - DEPLOYMENT_EXECUTOR_SIZE, new NamedThreadFactory("deployment", LOG)); + private final ExecutorService executor; + + /** Constructor. */ + public FileDeployerService(String nodeName) { + executor = Executors.newFixedThreadPool( + DEPLOYMENT_EXECUTOR_SIZE, + NamedThreadFactory.create(nodeName, "deployment", LOG) + ); + } public void initUnitsFolder(Path unitsFolder) { this.unitsFolder = unitsFolder; diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java index 5856fb2568..8ba91b0860 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/NodeStatusWatchListener.java @@ -45,8 +45,7 @@ public class NodeStatusWatchListener implements WatchListener { private final NodeEventCallback callback; - private final ExecutorService executor = Executors.newFixedThreadPool( - 4, new NamedThreadFactory("NodeStatusWatchListener-pool", LOG)); + private final ExecutorService executor; /** * Constructor. @@ -59,6 +58,10 @@ public class NodeStatusWatchListener implements WatchListener { this.deploymentUnitStore = deploymentUnitStore; this.nodeName = nodeName; this.callback = callback; + + executor = Executors.newFixedThreadPool( + 4, NamedThreadFactory.create(nodeName, "NodeStatusWatchListener-pool", LOG) + ); } @Override diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java index deb6dfd379..a8af0a2726 100644 --- a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java +++ b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/FileDeployerServiceTest.java @@ -47,7 +47,7 @@ import org.junit.jupiter.api.extension.ExtendWith; */ @ExtendWith(WorkDirectoryExtension.class) public class FileDeployerServiceTest { - private final FileDeployerService service = new FileDeployerService(); + private final FileDeployerService service = new FileDeployerService("test"); @WorkDirectory private Path workDir; diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobContextManagerTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobContextManagerTest.java index f23ef7a8c4..f7476aae65 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobContextManagerTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobContextManagerTest.java @@ -74,7 +74,7 @@ class JobContextManagerTest extends BaseIgniteAbstractTest { @BeforeEach void setUp() { - FileDeployerService deployerService = new FileDeployerService(); + FileDeployerService deployerService = new FileDeployerService("test"); deployerService.initUnitsFolder(unitsDir); classLoaderManager = new JobContextManager( diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java index 8bd7824592..8bc669ef5b 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java @@ -217,7 +217,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater = (LongFunction<CompletableFuture<?>> function) -> metastore.registerRevisionUpdateListener(function::apply); - var cfgStorage = new DistributedConfigurationStorage(metastore); + var cfgStorage = new DistributedConfigurationStorage("test", metastore); ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator( modules.distributed().rootKeys(), diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index e7de7f86da..1107264049 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -173,8 +173,8 @@ public class JraftServerImpl implements RaftServer { this.dataPath = dataPath; this.nodeManager = new NodeManager(); this.logStorageFactory = IgniteSystemProperties.getBoolean(LOGIT_STORAGE_ENABLED_PROPERTY, false) - ? new LogitLogStorageFactory(dataPath.resolve("log"), getLogOptions()) - : new DefaultLogStorageFactory(dataPath.resolve("log")); + ? new LogitLogStorageFactory(service.nodeName(), dataPath.resolve("log"), getLogOptions()) + : new DefaultLogStorageFactory(service.nodeName(), dataPath.resolve("log")); this.opts = opts; this.raftGroupEventsClientListener = raftGroupEventsClientListener; diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java index 7558b2aafb..2346404d18 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java @@ -36,6 +36,7 @@ import org.apache.ignite.raft.jraft.option.RaftOptions; import org.apache.ignite.raft.jraft.storage.LogStorage; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.apache.ignite.raft.jraft.util.Platform; +import org.jetbrains.annotations.TestOnly; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -84,11 +85,21 @@ public class DefaultLogStorageFactory implements LogStorageFactory { * * @param path Path to the storage. */ + @TestOnly public DefaultLogStorageFactory(Path path) { + this("test", path); + } + + /** + * Constructor. + * + * @param path Path to the storage. + */ + public DefaultLogStorageFactory(String nodeName, Path path) { this.path = path; executorService = Executors.newSingleThreadExecutor( - new NamedThreadFactory("raft-shared-log-storage-pool", LOG) + NamedThreadFactory.create(nodeName, "raft-shared-log-storage-pool", LOG) ); } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactoryCreator.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactoryCreator.java index b8ea9dd41a..140e7d58b4 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactoryCreator.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactoryCreator.java @@ -78,12 +78,12 @@ public class VolatileLogStorageFactoryCreator implements LogStorageFactoryCreato * * @param spillOutPath Path at which to put spill-out data. */ - public VolatileLogStorageFactoryCreator(Path spillOutPath) { + public VolatileLogStorageFactoryCreator(String nodeName, Path spillOutPath) { this.spillOutPath = Objects.requireNonNull(spillOutPath); executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * 2, - new NamedThreadFactory("raft-volatile-log-rocksdb-spillout-pool", LOG) + NamedThreadFactory.create(nodeName, "raft-volatile-log-rocksdb-spillout-pool", LOG) ); } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java index a38628d8dc..3537a55f2e 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java @@ -56,11 +56,11 @@ public class LogitLogStorageFactory implements LogStorageFactory { * @param baseLogStoragesPath Location of all log storages, created by this factory. * @param storeOptions Logit log storage options. */ - public LogitLogStorageFactory(Path baseLogStoragesPath, StoreOptions storeOptions) { + public LogitLogStorageFactory(String nodeName, Path baseLogStoragesPath, StoreOptions storeOptions) { this.baseLogStoragesPath = baseLogStoragesPath; this.storeOptions = storeOptions; checkpointExecutor = Executors.newSingleThreadScheduledExecutor( - new NamedThreadFactory("logit-checkpoint-executor", LOG) + NamedThreadFactory.create(nodeName, "logit-checkpoint-executor", LOG) ); checkVmOptions(); diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java index c53dc213d1..caf044ccca 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java @@ -150,7 +150,7 @@ public class LogStorageBenchmark { int totalLogs = 100 * 1024; // LogStorageFactory logStorageFactory = new DefaultLogStorageFactory(testPath); - LogStorageFactory logStorageFactory = new LogitLogStorageFactory(testPath, new StoreOptions()); + LogStorageFactory logStorageFactory = new LogitLogStorageFactory("test", testPath, new StoreOptions()); logStorageFactory.start(); try (AutoCloseable factory = logStorageFactory::close) { diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java index d256c25ba5..6a202a0909 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java @@ -47,7 +47,7 @@ public class LogitLogStorageTest extends BaseLogStorageTest { @BeforeEach @Override public void setup() throws Exception { - logStorageFactory = new LogitLogStorageFactory(path, testStoreOptions()); + logStorageFactory = new LogitLogStorageFactory("test", path, testStoreOptions()); logStorageFactory.start(); super.setup(); diff --git a/modules/runner/src/defaultsGenerator/java/org/apache/ignite/internal/configuration/generator/DefaultsGenerator.java b/modules/runner/src/defaultsGenerator/java/org/apache/ignite/internal/configuration/generator/DefaultsGenerator.java index 41de553fd8..a5173a29bc 100644 --- a/modules/runner/src/defaultsGenerator/java/org/apache/ignite/internal/configuration/generator/DefaultsGenerator.java +++ b/modules/runner/src/defaultsGenerator/java/org/apache/ignite/internal/configuration/generator/DefaultsGenerator.java @@ -87,7 +87,7 @@ public class DefaultsGenerator { modules.local().polymorphicSchemaExtensions() ); - ConfigurationStorage storage = new LocalFileConfigurationStorage(configPath, localConfigurationGenerator); + ConfigurationStorage storage = new LocalFileConfigurationStorage("defaultGen", configPath, localConfigurationGenerator); ConfigurationValidator configurationValidator = ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator, modules.local().validators()); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 1c0a676139..a58a241bdd 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -198,7 +198,7 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract deployWatchesFut = metaStorageManager.deployWatches(); // create a custom storage implementation that is able to "lose" some storage updates - var distributedCfgStorage = new DistributedConfigurationStorage(metaStorageManager) { + var distributedCfgStorage = new DistributedConfigurationStorage("test", metaStorageManager) { /** {@inheritDoc} */ @Override public synchronized void registerConfigurationListener(ConfigurationStorageListener listener) { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index afa1443ad4..8e2258f211 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -170,7 +170,7 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes deployWatchesFut = metaStorageManager.deployWatches(); - cfgStorage = new DistributedConfigurationStorage(metaStorageManager); + cfgStorage = new DistributedConfigurationStorage("test", metaStorageManager); } /** diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 67d5071877..13a685d976 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -392,7 +392,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { } }; - var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr); + var cfgStorage = new DistributedConfigurationStorage("test", metaStorageMgr); ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator( modules.distributed().rootKeys(), diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 4da06b0c76..2f76950b2f 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -380,6 +380,7 @@ public class IgniteImpl implements Ignite { ); LocalFileConfigurationStorage localFileConfigurationStorage = new LocalFileConfigurationStorage( + name, configPath, localConfigurationGenerator ); @@ -513,7 +514,7 @@ public class IgniteImpl implements Ignite { topologyAwareRaftGroupServiceFactory ); - this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr); + this.cfgStorage = new DistributedConfigurationStorage(name, metaStorageMgr); clusterCfgMgr = new ConfigurationManager( modules.distributed().rootKeys(), @@ -577,9 +578,9 @@ public class IgniteImpl implements Ignite { ); dataStorageMgr = new DataStorageManager(applyThreadAssertionsIfNeeded(storageEngines)); - volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout")); + volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(name, workDir.resolve("volatile-log-spillout")); - outgoingSnapshotsManager = new OutgoingSnapshotsManager(clusterSvc.messagingService()); + outgoingSnapshotsManager = new OutgoingSnapshotsManager(name, clusterSvc.messagingService()); SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration( SchemaSynchronizationConfiguration.KEY diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java index b5c6bba418..c2f18f6271 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java @@ -90,7 +90,7 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { */ private volatile long changeId; - private final ExecutorService threadPool = Executors.newFixedThreadPool(4, new NamedThreadFactory("dst-cfg", LOG)); + private final ExecutorService threadPool; private final InFlightFutures futureTracker = new InFlightFutures(); @@ -99,8 +99,10 @@ public class DistributedConfigurationStorage implements ConfigurationStorage { * * @param metaStorageMgr Meta storage manager. */ - public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) { + public DistributedConfigurationStorage(String nodeName, MetaStorageManager metaStorageMgr) { this.metaStorageMgr = metaStorageMgr; + + threadPool = Executors.newFixedThreadPool(4, NamedThreadFactory.create(nodeName, "dst-cfg", LOG)); } @Override diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java index 98d3ab6aaf..f021322c04 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalFileConfigurationStorage.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteUtils; +import org.jetbrains.annotations.TestOnly; /** * Implementation of {@link ConfigurationStorage} based on local file configuration storage. @@ -88,9 +89,7 @@ public class LocalFileConfigurationStorage implements ConfigurationStorage { private final AtomicReference<ConfigurationStorageListener> lsnrRef = new AtomicReference<>(); /** Thread pool for configuration updates notifications. */ - private final ExecutorService notificationsThreadPool = Executors.newFixedThreadPool( - 2, new NamedThreadFactory("cfg-file", LOG) - ); + private final ExecutorService notificationsThreadPool; /** Tracks all running futures. */ private final InFlightFutures futureTracker = new InFlightFutures(); @@ -104,11 +103,26 @@ public class LocalFileConfigurationStorage implements ConfigurationStorage { * @param configPath Path to node bootstrap configuration file. * @param generator Configuration tree generator. */ + @TestOnly public LocalFileConfigurationStorage(Path configPath, ConfigurationTreeGenerator generator) { + this("test", configPath, generator); + } + + /** + * Constructor. + * + * @param configPath Path to node bootstrap configuration file. + * @param generator Configuration tree generator. + */ + public LocalFileConfigurationStorage(String nodeName, Path configPath, ConfigurationTreeGenerator generator) { this.configPath = configPath; this.generator = generator; this.tempConfigPath = configPath.resolveSibling(configPath.getFileName() + ".tmp"); + notificationsThreadPool = Executors.newFixedThreadPool( + 2, NamedThreadFactory.create(nodeName, "cfg-file", LOG) + ); + checkAndRestoreConfigFile(); } diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java index d2f48e6ce4..5476266656 100644 --- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java +++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorageTest.java @@ -67,7 +67,7 @@ public class DistributedConfigurationStorageTest extends ConfigurationStorageTes /** {@inheritDoc} */ @Override public ConfigurationStorage getStorage() { - return new DistributedConfigurationStorage(metaStorageManager); + return new DistributedConfigurationStorage("test", metaStorageManager); } /** diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java index 6c5a90428b..0909ad6ff0 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java @@ -179,7 +179,7 @@ public class PrepareServiceImpl implements PrepareService { THREAD_TIMEOUT_MS, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, "sql-planning-pool"), LOG) + NamedThreadFactory.create(nodeName, "sql-planning-pool", LOG) ); planningPool.allowCoreThreadTimeOut(true); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 093355bd88..9842e78334 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1049,7 +1049,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { new TestLocalRwTxCounter() ); - cfgStorage = new DistributedConfigurationStorage(metaStorageManager); + cfgStorage = new DistributedConfigurationStorage("test", metaStorageManager); clusterCfgGenerator = new ConfigurationTreeGenerator(GcConfiguration.KEY); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java index 6ad4b2905a..685663ea64 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.table.distributed.gc; -import static org.apache.ignite.internal.thread.NamedThreadFactory.threadPrefix; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; @@ -96,7 +95,7 @@ public class MvGc implements ManuallyCloseable { 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), - new NamedThreadFactory(threadPrefix(nodeName, "mv-gc"), LOG) + NamedThreadFactory.create(nodeName, "mv-gc", LOG) ); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java index 814cc87c3d..6214a06b9c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteUtils; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Outgoing snapshots manager. Manages a collection of all outgoing snapshots, currently present on the Ignite node. @@ -56,6 +57,8 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp */ private static final IgniteLogger LOG = Loggers.forClass(OutgoingSnapshotsManager.class); + private final String nodeName; + /** * Messaging service. */ @@ -74,7 +77,18 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp * * @param messagingService Messaging service. */ + @TestOnly public OutgoingSnapshotsManager(MessagingService messagingService) { + this("test", messagingService); + } + + /** + * Constructor. + * + * @param messagingService Messaging service. + */ + public OutgoingSnapshotsManager(String nodeName, MessagingService messagingService) { + this.nodeName = nodeName; this.messagingService = messagingService; } @@ -88,7 +102,7 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp @Override public CompletableFuture<Void> start() { executor = new ThreadPoolExecutor(0, 4, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), new NamedThreadFactory("outgoing-snapshots", LOG) + new LinkedBlockingQueue<>(), NamedThreadFactory.create(nodeName, "outgoing-snapshots", LOG) ); messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage);