This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 54568701e8 IGNITE-20477 Introduce async components start (#2997) 54568701e8 is described below commit 54568701e89a86e69ed61f667e0659c72335c1d3 Author: Mirza Aliev <alievmi...@gmail.com> AuthorDate: Fri Jan 12 11:11:24 2024 +0400 IGNITE-20477 Introduce async components start (#2997) --- .../internal/catalog/CatalogManagerImpl.java | 4 ++- .../ignite/internal/catalog/ClockWaiter.java | 4 ++- .../ignite/internal/catalog/storage/UpdateLog.java | 4 ++- .../internal/catalog/storage/UpdateLogImpl.java | 5 +++- .../ignite/internal/catalog/CatalogTestUtils.java | 20 ++++++++++---- .../ignite/client/handler/ClientHandlerModule.java | 5 +++- .../client/handler/DummyAuthenticationManager.java | 7 +++-- .../ignite/client/TestClientHandlerModule.java | 5 +++- .../client/fakes/FakeIgniteQueryProcessor.java | 6 ++-- .../apache/ignite/client/fakes/FakeTxManager.java | 3 +- .../management/ClusterManagementGroupManager.java | 4 ++- .../raft/RocksDbClusterStateStorage.java | 5 +++- .../management/raft/TestClusterStateStorage.java | 5 +++- .../internal/deployunit/DeploymentManagerImpl.java | 5 +++- .../internal/compute/ComputeComponentImpl.java | 5 +++- .../compute/util/DummyIgniteDeployment.java | 5 ++-- .../configuration/ConfigurationManager.java | 7 ++++- .../configuration/ConfigurationRegistry.java | 4 ++- .../internal/components/LongJvmPauseDetector.java | 10 +++++-- .../ignite/internal/manager/IgniteComponent.java | 10 ++++++- .../distributionzones/DistributionZoneManager.java | 32 +++++----------------- .../network/file/FileTransferServiceImpl.java | 4 ++- .../internal/index/IndexBuildingManager.java | 5 +++- .../apache/ignite/internal/index/IndexManager.java | 5 +++- .../metastorage/impl/MetaStorageManagerImpl.java | 4 ++- .../impl/MetaStorageManagerRecoveryTest.java | 4 ++- .../ignite/internal/metrics/MetricManager.java | 4 ++- .../ignite/network/NettyBootstrapFactory.java | 7 ++++- .../scalecube/ScaleCubeClusterServiceFactory.java | 6 +++- .../ignite/utils/ClusterServiceTestUtils.java | 6 +++- .../placementdriver/PlacementDriverManager.java | 4 ++- .../java/org/apache/ignite/internal/raft/Loza.java | 4 +-- .../internal/raft/server/impl/JraftServerImpl.java | 6 +++- .../impl/VolatileLogStorageFactoryCreator.java | 6 +++- .../ignite/internal/replicator/ReplicaManager.java | 5 +++- .../apache/ignite/internal/rest/RestComponent.java | 6 ++-- .../org/apache/ignite/internal/app/IgniteImpl.java | 8 +++--- .../ignite/internal/app/LifecycleManager.java | 22 ++++++++++++++- .../ignite/internal/app/ThreadPoolsManager.java | 6 +++- .../ignite/internal/schema/SchemaManager.java | 5 +++- .../authentication/AuthenticationManagerImpl.java | 4 ++- .../ignite/internal/sql/api/IgniteSqlImpl.java | 7 ++++- .../internal/sql/engine/SqlQueryProcessor.java | 4 ++- .../engine/framework/ClusterServiceFactory.java | 4 +-- .../internal/sql/engine/util/QueryCheckerTest.java | 3 +- .../internal/storage/DataStorageManager.java | 7 ++++- .../internal/systemview/SystemViewManagerImpl.java | 5 +++- .../internal/table/distributed/TableManager.java | 4 ++- .../outgoing/OutgoingSnapshotsManager.java | 5 +++- .../ignite/internal/tx/impl/TxManagerImpl.java | 4 ++- .../apache/ignite/internal/vault/VaultManager.java | 5 +++- 51 files changed, 233 insertions(+), 91 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index f00dbc1086..aec906d317 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -133,7 +133,7 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata } @Override - public void start() { + public CompletableFuture<Void> start() { int objectIdGen = 0; // TODO: IGNITE-19082 Move default schema objects initialization to cluster init procedure. @@ -166,6 +166,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata updateLog.registerUpdateHandler(new OnUpdateHandlerImpl()); updateLog.start(); + + return nullCompletedFuture(); } @Override diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java index cbb8ad84f2..fbc7f8ec47 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java @@ -93,10 +93,12 @@ public class ClockWaiter implements IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { clock.addUpdateListener(updateListener); scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter-scheduler", LOG)); + + return nullCompletedFuture(); } @Override diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java index 9969780b9d..8185111838 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java @@ -54,9 +54,11 @@ public interface UpdateLog extends IgniteComponent { * <p>Log replay is a part of a component start up process, thus the handler must * be registered prior to start is invoked, otherwise exception will be thrown. * + * @return Completable future. * @throws IgniteInternalException If no handler has been registered. */ - @Override void start() throws IgniteInternalException; + @Override + CompletableFuture<Void> start() throws IgniteInternalException; /** An interface describing a handler that will receive notification when a new update is added to the log. */ @FunctionalInterface diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java index 794be6a30c..0f490a6620 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java @@ -27,6 +27,7 @@ import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.metastorage.dsl.Statements.iif; import static org.apache.ignite.internal.util.ByteUtils.fromBytes; import static org.apache.ignite.internal.util.ByteUtils.intToBytes; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.util.ArrayList; import java.util.Collection; @@ -79,7 +80,7 @@ public class UpdateLogImpl implements UpdateLog { } @Override - public void start() { + public CompletableFuture<Void> start() { if (!busyLock.enterBusy()) { throw new IgniteException(Common.NODE_STOPPING_ERR, new NodeStoppingException()); } @@ -103,6 +104,8 @@ public class UpdateLogImpl implements UpdateLog { } finally { busyLock.leaveBusy(); } + + return nullCompletedFuture(); } @Override diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java index 95e8b5f10f..4c4be4d062 100644 --- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java +++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.catalog; +import static java.util.concurrent.CompletableFuture.allOf; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -71,7 +73,7 @@ public class CatalogTestUtils { return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter) { @Override - public void start() { + public CompletableFuture<Void> start() { vault.start(); metastore.start(); clockWaiter.start(); @@ -79,6 +81,8 @@ public class CatalogTestUtils { super.start(); assertThat(metastore.deployWatches(), willCompleteSuccessfully()); + + return nullCompletedFuture(); } @Override @@ -115,10 +119,12 @@ public class CatalogTestUtils { return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter) { @Override - public void start() { + public CompletableFuture<Void> start() { clockWaiter.start(); super.start(); + + return nullCompletedFuture(); } @Override @@ -155,10 +161,10 @@ public class CatalogTestUtils { return new CatalogManagerImpl(new TestUpdateLog(clock), clockWaiter) { @Override - public void start() { - clockWaiter.start(); + public CompletableFuture<Void> start() { + CompletableFuture<Void> fut = clockWaiter.start(); - super.start(); + return allOf(fut, super.start()); } @Override @@ -299,13 +305,15 @@ public class CatalogTestUtils { } @Override - public void start() throws IgniteInternalException { + public CompletableFuture<Void> start() throws IgniteInternalException { if (onUpdateHandler == null) { throw new IgniteInternalException( Common.INTERNAL_ERR, "Handler must be registered prior to component start" ); } + + return nullCompletedFuture(); } @Override diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java index 04ccbdc430..7a894cfc0a 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java @@ -17,6 +17,7 @@ package org.apache.ignite.client.handler; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import io.netty.bootstrap.ServerBootstrap; @@ -198,7 +199,7 @@ public class ClientHandlerModule implements IgniteComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { if (channel != null) { throw new IgniteInternalException(INTERNAL_ERR, "ClientHandlerModule is already started."); } @@ -217,6 +218,8 @@ public class ClientHandlerModule implements IgniteComponent { } catch (InterruptedException e) { throw new IgniteInternalException(INTERNAL_ERR, e); } + + return nullCompletedFuture(); } /** {@inheritDoc} */ diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/DummyAuthenticationManager.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/DummyAuthenticationManager.java index 4dabcbe216..7453c221b8 100644 --- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/DummyAuthenticationManager.java +++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/DummyAuthenticationManager.java @@ -17,6 +17,9 @@ package org.apache.ignite.client.handler; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.security.authentication.AuthenticationManager; import org.apache.ignite.internal.security.authentication.AuthenticationRequest; @@ -39,8 +42,8 @@ public class DummyAuthenticationManager implements AuthenticationManager { } @Override - public void start() { - + public CompletableFuture<Void> start() { + return nullCompletedFuture(); } @Override diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java index bd40a832ee..c438797282 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java @@ -17,6 +17,7 @@ package org.apache.ignite.client; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.mockito.Mockito.mock; import io.netty.bootstrap.ServerBootstrap; @@ -150,7 +151,7 @@ public class TestClientHandlerModule implements IgniteComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { if (channel != null) { throw new IgniteException("ClientHandlerModule is already started."); } @@ -160,6 +161,8 @@ public class TestClientHandlerModule implements IgniteComponent { } catch (InterruptedException e) { throw new IgniteException(e); } + + return nullCompletedFuture(); } /** {@inheritDoc} */ diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java index e6bb5b0d77..528ddabd3e 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java @@ -17,6 +17,8 @@ package org.apache.ignite.client.fakes; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; import org.apache.ignite.internal.sql.engine.InternalSqlRow; @@ -60,8 +62,8 @@ public class FakeIgniteQueryProcessor implements QueryProcessor { } @Override - public void start() { - + public CompletableFuture<Void> start() { + return nullCompletedFuture(); } @Override diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index 91d425830b..8f980a46ad 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -50,8 +50,9 @@ public class FakeTxManager implements TxManager { } @Override - public void start() { + public CompletableFuture<Void> start() { // No-op. + return nullCompletedFuture(); } @Override diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index 88f71ac792..778feb0c9a 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -201,7 +201,7 @@ public class ClusterManagementGroupManager implements IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { synchronized (raftServiceLock) { raftService = recoverLocalState(); } @@ -224,6 +224,8 @@ public class ClusterManagementGroupManager implements IgniteComponent { } }) ); + + return nullCompletedFuture(); } /** diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java index 4631b0c814..fe3c40a344 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.cluster.management.raft; import static org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.nio.file.Path; import java.util.Arrays; @@ -72,7 +73,7 @@ public class RocksDbClusterStateStorage implements ClusterStateStorage { } @Override - public void start() { + public CompletableFuture<Void> start() { options = new Options().setCreateIfMissing(true); try { @@ -84,6 +85,8 @@ public class RocksDbClusterStateStorage implements ClusterStateStorage { } catch (RocksDBException e) { throw new IgniteInternalException("Failed to start the storage", e); } + + return nullCompletedFuture(); } @Override diff --git a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java index 106263eeac..2463e761af 100644 --- a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java +++ b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.cluster.management.raft; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.startsWith; import java.io.ObjectInputStream; @@ -53,8 +54,10 @@ public class TestClusterStateStorage implements ClusterStateStorage { private volatile boolean isStarted = false; @Override - public void start() { + public CompletableFuture<Void> start() { isStarted = true; + + return nullCompletedFuture(); } @Override diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java index fcfbf0d908..adf1590b40 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java @@ -25,6 +25,7 @@ import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING; import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING; import static org.apache.ignite.internal.deployunit.UnitContent.toDeploymentUnit; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import java.nio.file.Path; @@ -395,13 +396,15 @@ public class DeploymentManagerImpl implements IgniteDeployment { } @Override - public void start() { + public CompletableFuture<Void> start() { deployer.initUnitsFolder(workDir.resolve(configuration.deploymentLocation().value())); deploymentUnitStore.registerNodeStatusListener(nodeStatusWatchListener); deploymentUnitStore.registerClusterStatusListener(clusterStatusWatchListener); messaging.subscribe(); failover.registerTopologyChangeCallback(nodeStatusCallback, clusterEventCallback); undeployer.start(UNDEPLOYER_DELAY.getSeconds(), TimeUnit.SECONDS); + + return nullCompletedFuture(); } @Override diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java index 20c98c71e5..db297e6ff9 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.compute; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.compute.ClassLoaderExceptionsMapper.mapClassLoaderExceptions; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; import java.util.List; @@ -149,9 +150,11 @@ public class ComputeComponentImpl implements ComputeComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { executor.start(); messaging.start(this::start, this::getJobStatus); + + return nullCompletedFuture(); } /** {@inheritDoc} */ diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java index c60d219f20..e628c9a1e3 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/util/DummyIgniteDeployment.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.compute.util; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.io.File; import java.nio.file.Files; @@ -117,8 +118,8 @@ public class DummyIgniteDeployment implements IgniteDeployment { } @Override - public void start() { - + public CompletableFuture<Void> start() { + return nullCompletedFuture(); } @Override diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java index 43860ae27c..f7e98e454f 100644 --- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java +++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationManager.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.configuration; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + import java.util.Collection; +import java.util.concurrent.CompletableFuture; import org.apache.ignite.configuration.RootKey; import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; import org.apache.ignite.internal.configuration.validation.ConfigurationValidator; @@ -55,8 +58,10 @@ public class ConfigurationManager implements IgniteComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { registry.start(); + + return nullCompletedFuture(); } /** {@inheritDoc} */ diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java index eaae04c0e0..9d2d4f44ab 100644 --- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java +++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java @@ -94,8 +94,10 @@ public class ConfigurationRegistry implements IgniteComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { changer.start(); + + return nullCompletedFuture(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java b/modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java index 6875cd2bb7..d153e1ef95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java @@ -19,9 +19,11 @@ package org.apache.ignite.internal.components; import static org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean; import static org.apache.ignite.internal.lang.IgniteSystemProperties.getInteger; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.logger.IgniteLogger; @@ -90,11 +92,11 @@ public class LongJvmPauseDetector implements IgniteComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { if (DISABLED) { log.debug("JVM Pause Detector is disabled"); - return; + return nullCompletedFuture(); } final Thread worker = new Thread(NamedThreadFactory.threadPrefix(nodeName, "jvm-pause-detector-worker")) { @@ -151,13 +153,15 @@ public class LongJvmPauseDetector implements IgniteComponent { if (!workerRef.compareAndSet(null, worker)) { log.debug("{} already started", LongJvmPauseDetector.class.getSimpleName()); - return; + return nullCompletedFuture(); } worker.setDaemon(true); worker.start(); log.debug("{} was successfully started", LongJvmPauseDetector.class.getSimpleName()); + + return nullCompletedFuture(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/IgniteComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/IgniteComponent.java index 7b4b3ecc35..2f16d9ccff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/manager/IgniteComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/IgniteComponent.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.manager; +import java.util.concurrent.CompletableFuture; + /** * Common interface for ignite components that provides entry points for component lifecycle flow. */ @@ -24,8 +26,14 @@ public interface IgniteComponent { /** * Starts the component. Depending on component flow both configuration properties listeners, meta storage watch registration, starting * thread pools and threads goes here. + * + * <p>All actions in the component startup is divided into two categories: sync actions, + * that can be executed synchronously in order for the component to be usable by other components during their startup, + * and async actions, that are wrapped in a CompletableFuture and returned from the start method. + * + * @return Future that will be completed when the asynchronous part of the start is processed. */ - void start(); + CompletableFuture<Void> start(); /** * Triggers running before node stop logic. It's guaranteed that during beforeNodeStop all components beneath given one are still diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index 8bb5dadf80..e6845ff14c 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -21,7 +21,6 @@ import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableSet; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.failedFuture; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; @@ -65,6 +64,7 @@ import static org.apache.ignite.internal.util.ByteUtils.toBytes; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; @@ -120,8 +120,6 @@ import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.thread.StripedScheduledThreadPoolExecutor; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.vault.VaultManager; -import org.apache.ignite.lang.ErrorGroups.Common; -import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.TestOnly; /** @@ -131,12 +129,6 @@ public class DistributionZoneManager implements IgniteComponent { /** The logger. */ private static final IgniteLogger LOG = Loggers.forClass(DistributionZoneManager.class); - /** - * Timeout for async operations to be performed in the manager start. - * TODO: temporary solution, must be removed in https://issues.apache.org/jira/browse/IGNITE-20477 - */ - private static final long START_TIMEOUT = 10_000L; - /** Meta Storage manager. */ private final MetaStorageManager metaStorageManager; @@ -254,8 +246,8 @@ public class DistributionZoneManager implements IgniteComponent { } @Override - public void start() { - inBusyLock(busyLock, () -> { + public CompletableFuture<Void> start() { + return inBusyLockAsync(busyLock, () -> { registerCatalogEventListenersOnStartManagerBusy(); logicalTopologyService.addEventListener(topologyEventListener); @@ -271,20 +263,10 @@ public class DistributionZoneManager implements IgniteComponent { restoreGlobalStateFromLocalMetastorage(recoveryRevision); - List<CompletableFuture<Void>> futures = new ArrayList<>(); - - futures.add(createOrRestoreZonesStates(recoveryRevision)); - - futures.add(restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision)); - - try { - // TODO: return this futures to start method https://issues.apache.org/jira/browse/IGNITE-20477 - allOf(futures.toArray(CompletableFuture[]::new)).get(START_TIMEOUT, MILLISECONDS); - } catch (Exception e) { - throw new IgniteException(Common.COMPONENT_NOT_STARTED_ERR, e); - } - - rebalanceEngine.start(); + return allOf( + createOrRestoreZonesStates(recoveryRevision), + restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision) + ).thenRun(rebalanceEngine::start); }); } diff --git a/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java b/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java index e845514cfd..1e62271743 100644 --- a/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java +++ b/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java @@ -218,7 +218,7 @@ public class FileTransferServiceImpl implements FileTransferService { } @Override - public void start() { + public CompletableFuture<Void> start() { topologyService.addEventHandler(new TopologyEventHandler() { @Override public void onDisappeared(ClusterNode member) { @@ -240,6 +240,8 @@ public class FileTransferServiceImpl implements FileTransferService { LOG.error("Unexpected message received: {}", message); } }); + + return nullCompletedFuture(); } @Override diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java index e501d45c55..3dc717ac55 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildingManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.index; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; @@ -70,7 +71,7 @@ public class IndexBuildingManager implements IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { inBusyLock(busyLock, () -> { CompletableFuture<Long> recoveryFinishedFuture = metaStorageManager.recoveryFinishedFuture(); @@ -80,6 +81,8 @@ public class IndexBuildingManager implements IgniteComponent { indexAvailabilityController.recover(recoveryRevision); }); + + return nullCompletedFuture(); } @Override diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index 8bb672fc4e..fd2c1082c9 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; @@ -125,7 +126,7 @@ public class IndexManager implements IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { LOG.debug("Index manager is about to start"); startIndexes(); @@ -139,6 +140,8 @@ public class IndexManager implements IgniteComponent { }); LOG.info("Index manager started"); + + return nullCompletedFuture(); } @Override diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 0cde9d2beb..8a5475a2f7 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -355,7 +355,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } @Override - public void start() { + public CompletableFuture<Void> start() { storage.start(); appliedRevision = readRevisionFromVault(); @@ -383,6 +383,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager { metaStorageSvcFut.complete(service); } }); + + return nullCompletedFuture(); } private long readRevisionFromVault() { diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java index 6b03ff01c6..09a4c5fed7 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -138,7 +139,8 @@ public class MetaStorageManagerRecoveryTest extends BaseIgniteAbstractTest { } @Override - public void start() { + public CompletableFuture<Void> start() { + return nullCompletedFuture(); } }; } diff --git a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java index 8df0eb3b93..9bc652b991 100644 --- a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java +++ b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java @@ -92,8 +92,10 @@ public class MetricManager implements IgniteComponent { } /** {@inheritDoc} */ - @Override public void start() { + @Override public CompletableFuture<Void> start() { start(loadExporters()); + + return nullCompletedFuture(); } /** diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java index 9d086cd419..2bcd765dea 100644 --- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java +++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java @@ -17,12 +17,15 @@ package org.apache.ignite.network; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.network.configuration.InboundView; @@ -131,10 +134,12 @@ public class NettyBootstrapFactory implements IgniteComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { bossGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-srv-accept"); workerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-srv-worker"); clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client"); + + return nullCompletedFuture(); } /** diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java index 30b9a23f35..70f5966d35 100644 --- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java +++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java @@ -18,6 +18,8 @@ package org.apache.ignite.network.scalecube; import static io.scalecube.cluster.membership.MembershipEvent.createAdded; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterImpl; @@ -116,7 +118,7 @@ public class ScaleCubeClusterServiceFactory { private volatile CompletableFuture<Void> shutdownFuture; @Override - public void start() { + public CompletableFuture<Void> start() { var serializationService = new SerializationService(serializationRegistry, userObjectSerialization); UUID launchId = UUID.randomUUID(); @@ -178,6 +180,8 @@ public class ScaleCubeClusterServiceFactory { topologyService.onMembershipEvent(localMembershipEvent); this.cluster = cluster; + + return nullCompletedFuture(); } @Override diff --git a/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java b/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java index 56ada364ec..d7745826fa 100644 --- a/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java +++ b/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java @@ -20,9 +20,11 @@ package org.apache.ignite.utils; import static java.util.stream.Collectors.toUnmodifiableList; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; import org.apache.ignite.configuration.annotation.ConfigurationType; import org.apache.ignite.internal.configuration.ConfigurationManager; @@ -154,7 +156,7 @@ public class ClusterServiceTestUtils { } @Override - public void start() { + public CompletableFuture<Void> start() { nodeConfigurationMgr.start(); NetworkConfiguration configuration = nodeConfigurationMgr.configurationRegistry() @@ -174,6 +176,8 @@ public class ClusterServiceTestUtils { bootstrapFactory.start(); clusterSvc.start(); + + return nullCompletedFuture(); } @Override diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java index 842c74f57c..9335fcb64d 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java @@ -134,7 +134,7 @@ public class PlacementDriverManager implements IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { inBusyLock(busyLock, () -> { placementDriverNodesNamesProvider.get() .thenCompose(placementDriverNodes -> { @@ -169,6 +169,8 @@ public class PlacementDriverManager implements IgniteComponent { recoverInternalComponentsBusy(); }); + + return nullCompletedFuture(); } @Override diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index 50a073b805..c077956c30 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -174,14 +174,14 @@ public class Loza implements RaftManager { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { RaftView raftConfig = raftConfiguration.value(); opts.setRpcInstallSnapshotTimeout(raftConfig.rpcInstallSnapshotTimeout()); opts.getRaftOptions().setSync(raftConfig.fsync()); - raftServer.start(); + return raftServer.start(); } /** {@inheritDoc} */ diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 9e83cbaf96..2fc40a27c3 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.server.impl; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.io.File; import java.io.IOException; @@ -32,6 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -234,7 +236,7 @@ public class JraftServerImpl implements RaftServer { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { assert opts.isSharedPools() : "RAFT server is supposed to run in shared pools mode"; // Pre-create all pools in shared mode. @@ -328,6 +330,8 @@ public class JraftServerImpl implements RaftServer { logStorageFactory.start(); rpcServer.init(null); + + return nullCompletedFuture(); } /** {@inheritDoc} */ diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactoryCreator.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactoryCreator.java index a9af40e13f..b8ea9dd41a 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactoryCreator.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactoryCreator.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.raft.storage.impl; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY; import java.io.IOException; @@ -25,6 +26,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -86,7 +88,7 @@ public class VolatileLogStorageFactoryCreator implements LogStorageFactoryCreato } @Override - public void start() { + public CompletableFuture<Void> start() { try { Files.createDirectories(spillOutPath); } catch (IOException e) { @@ -120,6 +122,8 @@ public class VolatileLogStorageFactoryCreator implements LogStorageFactoryCreato } catch (Exception e) { throw new RuntimeException(e); } + + return nullCompletedFuture(); } private void wipeOutDb() { diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 2b045b162f..25ba114e41 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED; import static org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; @@ -611,7 +612,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { clusterNetSvc.messagingService().addMessageHandler(ReplicaMessageGroup.class, handler); clusterNetSvc.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, placementDriverMessageHandler); messageGroupsToHandle.forEach(mg -> clusterNetSvc.messagingService().addMessageHandler(mg, handler)); @@ -631,6 +632,8 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc }); localNodeId = clusterNetSvc.topologyService().localMember().id(); + + return nullCompletedFuture(); } /** {@inheritDoc} */ diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java index 7365a2a61f..48a18f38f1 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.rest; import static io.micronaut.context.env.Environment.BARE_METAL; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import io.micronaut.context.ApplicationContext; import io.micronaut.http.server.exceptions.ServerStartupException; @@ -31,6 +32,7 @@ import java.net.UnknownHostException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.logger.IgniteLogger; @@ -99,7 +101,7 @@ public class RestComponent implements IgniteComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { RestView restConfigurationView = restConfiguration.value(); RestSslView sslConfigurationView = restConfigurationView.ssl(); @@ -107,7 +109,7 @@ public class RestComponent implements IgniteComponent { boolean dualProtocol = restConfiguration.dualProtocol().value(); if (startServer(restConfigurationView.port(), sslConfigurationView.port(), sslEnabled, dualProtocol)) { - return; + return nullCompletedFuture(); } String msg = "Cannot start REST endpoint." diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 59c1f69d82..0033e7a0ae 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -816,7 +816,7 @@ public class IgniteImpl implements Ignite { return cmgMgr.joinFuture() //Disable REST component during initialization. - .thenAcceptAsync(unused -> restComponent.disable()) + .thenAcceptAsync(unused -> restComponent.disable(), startupExecutor) .thenComposeAsync(unused -> { LOG.info("Join complete, starting MetaStorage"); @@ -867,7 +867,7 @@ public class IgniteImpl implements Ignite { .thenComposeAsync(v -> { LOG.info("Components started, performing recovery"); - return recoverComponentsStateOnStart(startupExecutor); + return recoverComponentsStateOnStart(startupExecutor, lifecycleManager.allComponentsStartFuture()); }, startupExecutor) .thenComposeAsync(v -> clusterCfgMgr.configurationRegistry().onDefaultsPersisted(), startupExecutor) // Signal that local recovery is complete and the node is ready to join the cluster. @@ -1098,11 +1098,11 @@ public class IgniteImpl implements Ignite { * Recovers components state on start by invoking configuration listeners ({@link #notifyConfigurationListeners()} and deploying watches * after that. */ - private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService startupExecutor) { + private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService startupExecutor, CompletableFuture<Void> startFuture) { CompletableFuture<Void> startupConfigurationUpdate = notifyConfigurationListeners(); CompletableFuture<Void> startupRevisionUpdate = metaStorageMgr.notifyRevisionUpdateListenerOnStart(); - return CompletableFuture.allOf(startupConfigurationUpdate, startupRevisionUpdate) + return CompletableFuture.allOf(startupConfigurationUpdate, startupRevisionUpdate, startFuture) .thenComposeAsync(t -> { // Deploy all registered watches because all components are ready and have registered their listeners. return metaStorageMgr.deployWatches(); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java index 209da0a53b..b68846d3ac 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java @@ -17,8 +17,11 @@ package org.apache.ignite.internal.app; +import static java.util.concurrent.CompletableFuture.allOf; + import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; @@ -47,6 +50,8 @@ class LifecycleManager implements StateProvider { */ private final List<IgniteComponent> startedComponents = new ArrayList<>(); + private final List<CompletableFuture<Void>> allComponentsStartFuture = new ArrayList<>(); + LifecycleManager(String nodeName) { this.nodeName = nodeName; } @@ -71,7 +76,7 @@ class LifecycleManager implements StateProvider { synchronized (this) { startedComponents.add(component); - component.start(); + allComponentsStartFuture.add(component.start()); } } @@ -105,6 +110,21 @@ class LifecycleManager implements StateProvider { } } + /** + * Represents future that will be completed when all components start futures will be completed. + * Note that it is designed that this method is called only once. + * + * @return Future that will be completed when all components start futures will be completed. + */ + synchronized CompletableFuture<Void> allComponentsStartFuture() { + return allOf(allComponentsStartFuture.toArray(CompletableFuture[]::new)) + .whenComplete((v, e) -> { + synchronized (this) { + allComponentsStartFuture.clear(); + } + }); + } + /** * Stops all started components and transfers the node into the {@link State#STOPPING} state. */ diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java index ee4b952a90..76c05f15a2 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.app; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -48,8 +51,9 @@ public class ThreadPoolsManager implements IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { // No-op. + return nullCompletedFuture(); } @Override diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java index 10164d7ccf..31e462038b 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; @@ -81,11 +82,13 @@ public class SchemaManager implements IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { catalogService.listen(CatalogEvent.TABLE_CREATE, this::onTableCreated); catalogService.listen(CatalogEvent.TABLE_ALTER, this::onTableAltered); registerExistingTables(); + + return nullCompletedFuture(); } private void registerExistingTables() { diff --git a/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java b/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java index 90ccdc3c4a..958f927a28 100644 --- a/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java +++ b/modules/security/src/main/java/org/apache/ignite/internal/security/authentication/AuthenticationManagerImpl.java @@ -121,7 +121,7 @@ public class AuthenticationManagerImpl } @Override - public void start() { + public CompletableFuture<Void> start() { securityConfiguration.listen(securityConfigurationListener); securityConfiguration.enabled().listen(securityEnabledDisabledEventFactory); securityConfiguration.authentication().providers().listenElements(providerEventFactory); @@ -130,6 +130,8 @@ public class AuthenticationManagerImpl BasicAuthenticationProviderConfiguration basicAuthenticationProviderConfiguration = (BasicAuthenticationProviderConfiguration) securityConfiguration.authentication().providers().get(basicAuthenticationProviderName); basicAuthenticationProviderConfiguration.users().listenElements(userEventFactory); + + return nullCompletedFuture(); } @Override diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java index b475d491ee..e04d4e72af 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java @@ -17,8 +17,11 @@ package org.apache.ignite.internal.sql.api; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + import java.util.HashMap; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; @@ -107,7 +110,7 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { executor.scheduleWithFixedDelay( () -> { for (SessionImpl session : sessions.values()) { @@ -120,6 +123,8 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { SESSION_EXPIRE_CHECK_PERIOD, TimeUnit.MILLISECONDS ); + + return nullCompletedFuture(); } @Override diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index d0c3c46214..c660152014 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -254,7 +254,7 @@ public class SqlQueryProcessor implements QueryProcessor { /** {@inheritDoc} */ @Override - public synchronized void start() { + public synchronized CompletableFuture<Void> start() { var nodeName = clusterSrvc.topologyService().localMember().name(); taskExecutor = registerService(new QueryTaskExecutorImpl(nodeName)); @@ -350,6 +350,8 @@ public class SqlQueryProcessor implements QueryProcessor { this.executionSrvc = executionSrvc; services.forEach(LifecycleAware::start); + + return nullCompletedFuture(); } // need to be refactored after TODO: https://issues.apache.org/jira/browse/IGNITE-20925 diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java index 8759218f2d..1a486111ba 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java @@ -106,8 +106,8 @@ public class ClusterServiceFactory { /** {@inheritDoc} */ @Override - public void start() { - + public CompletableFuture<Void> start() { + return nullCompletedFuture(); } }; } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java index 7f499cfd87..caed3ae0bd 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java @@ -315,8 +315,9 @@ public class QueryCheckerTest extends BaseIgniteAbstractTest { } @Override - public void start() { + public CompletableFuture<Void> start() { // NO-OP + return nullCompletedFuture(); } @Override diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageManager.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageManager.java index d10f5df177..9576cfd893 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageManager.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageManager.java @@ -17,8 +17,11 @@ package org.apache.ignite.internal.storage; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import org.apache.ignite.configuration.annotation.Value; import org.apache.ignite.internal.configuration.tree.ConfigurationSource; @@ -48,8 +51,10 @@ public class DataStorageManager implements IgniteComponent { } @Override - public void start() throws StorageException { + public CompletableFuture<Void> start() throws StorageException { engines.values().forEach(StorageEngine::start); + + return nullCompletedFuture(); } @Override diff --git a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java index d0711f6aab..67620906bb 100644 --- a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java +++ b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.systemview; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.systemview.utils.SystemViewUtils.tupleSchemaForView; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import java.util.ArrayList; @@ -98,7 +99,7 @@ public class SystemViewManagerImpl implements SystemViewManager, NodeAttributesP } @Override - public void start() { + public CompletableFuture<Void> start() { inBusyLock(busyLock, () -> { if (!startGuard.compareAndSet(false, true)) { throw new IllegalStateException("System view manager cannot be started twice"); @@ -128,6 +129,8 @@ public class SystemViewManagerImpl implements SystemViewManager, NodeAttributesP nodeAttributes.put(NODE_ATTRIBUTES_KEY, String.join(NODE_ATTRIBUTES_LIST_SEPARATOR, views.keySet())); }); + + return nullCompletedFuture(); } @Override diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 6f5d338c5e..ca770df651 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -501,7 +501,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } @Override - public void start() { + public CompletableFuture<Void> start() { inBusyLock(busyLock, () -> { mvGc.start(); @@ -537,6 +537,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { partitionReplicatorNodeRecovery.start(); }); + + return nullCompletedFuture(); } private void processAssignmentsOnRecovery(long recoveryRevision) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java index 3a7d46f5e7..ad2bed9879 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing; import static java.util.Collections.unmodifiableList; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.util.ArrayList; import java.util.List; @@ -85,12 +86,14 @@ public class OutgoingSnapshotsManager implements PartitionsSnapshots, IgniteComp } @Override - public void start() { + public CompletableFuture<Void> start() { executor = new ThreadPoolExecutor(0, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("outgoing-snapshots", LOG) ); messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage); + + return nullCompletedFuture(); } @Override diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index b1ee5961a5..8be0913ef2 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -583,7 +583,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { } @Override - public void start() { + public CompletableFuture<Void> start() { localNodeId = clusterService.topologyService().localMember().id(); clusterService.messagingService().addMessageHandler(ReplicaMessageGroup.class, this); @@ -592,6 +592,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { orphanDetector.start(txStateVolatileStorage, txConfig.abandonedCheckTs()); txCleanupRequestHandler.start(); + + return nullCompletedFuture(); } @Override diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java index 849ab2bacb..02e910dc54 100644 --- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java +++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.vault; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -47,8 +48,10 @@ public class VaultManager implements IgniteComponent { /** {@inheritDoc} */ @Override - public void start() { + public CompletableFuture<Void> start() { vaultSvc.start(); + + return nullCompletedFuture(); } /** {@inheritDoc} */