This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 20a74ad IGNITE-14027 Server nodes outside of BLT should be included into BLT when auto-adjust is turned on - Fixes #8679. 20a74ad is described below commit 20a74adf72ceb914b351432ea22611c33cee817e Author: Semyon Danilov <samvi...@yandex.ru> AuthorDate: Mon Jan 25 14:28:32 2021 +0300 IGNITE-14027 Server nodes outside of BLT should be included into BLT when auto-adjust is turned on - Fixes #8679. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../org/apache/ignite/IgniteSystemProperties.java | 2 +- .../apache/ignite/internal/pagemem/PageUtils.java | 2 +- .../cluster/GridClusterStateProcessor.java | 26 ++++++++++++--- .../autoadjust/BaselineAutoAdjustScheduler.java | 10 +++++- ...gyWatcher.java => BaselineTopologyUpdater.java} | 38 ++++++++++----------- .../processors/cluster/BaselineAutoAdjustTest.java | 39 ++++++++++++++++++++++ 6 files changed, 89 insertions(+), 28 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index a6c6697..47dcb4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -112,7 +112,7 @@ import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxM import static org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.DFLT_TX_DEADLOCK_DETECTION_TIMEOUT; import static org.apache.ignite.internal.processors.cluster.ClusterProcessor.DFLT_DIAGNOSTIC_ENABLED; import static org.apache.ignite.internal.processors.cluster.ClusterProcessor.DFLT_UPDATE_NOTIFIER; -import static org.apache.ignite.internal.processors.cluster.baseline.autoadjust.ChangeTopologyWatcher.DFLT_BASELINE_AUTO_ADJUST_LOG_INTERVAL; +import static org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineTopologyUpdater.DFLT_BASELINE_AUTO_ADJUST_LOG_INTERVAL; import static org.apache.ignite.internal.processors.datastructures.GridAtomicCacheQueueImpl.DFLT_ATOMIC_CACHE_QUERY_RETRY_TIMEOUT; import static org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessor.DFLT_DUMP_PAGE_LOCK_ON_FAILURE; import static org.apache.ignite.internal.processors.failure.FailureProcessor.DFLT_FAILURE_HANDLER_RESERVE_BUFFER_SIZE; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java index 217164c..c86b4c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java @@ -120,7 +120,7 @@ public class PageUtils { } /** - * @param addr Address/ + * @param addr Address. * @param off Offset. * @param bytes Bytes. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 01ded97..085d38c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -68,7 +68,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.Metas import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineAutoAdjustStatus; -import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.ChangeTopologyWatcher; +import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineTopologyUpdater; import org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener; import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -160,8 +160,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I /** */ private final JdkMarshaller marsh = new JdkMarshaller(); - /** Watcher of topology change for baseline auto-adjust. */ - private ChangeTopologyWatcher changeTopologyWatcher; + /** Updater of baseline topology. */ + private BaselineTopologyUpdater baselineTopologyUpdater; /** Distributed baseline configuration. */ private DistributedBaselineConfiguration distributedBaselineConfiguration; @@ -457,10 +457,26 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I /** {@inheritDoc} */ @Override public void onKernalStart(boolean active) throws IgniteCheckedException { + baselineTopologyUpdater = new BaselineTopologyUpdater(ctx); + ctx.event().addLocalEventListener( - changeTopologyWatcher = new ChangeTopologyWatcher(ctx), + event -> { + DiscoveryEvent discoEvt = (DiscoveryEvent) event; + + if (discoEvt.eventNode().isClient() || discoEvt.eventNode().isDaemon()) + return; + + baselineTopologyUpdater.triggerBaselineUpdate(discoEvt.topologyVersion()); + }, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED ); + + distributedBaselineConfiguration.listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null && newVal) { + long topVer = ctx.discovery().topologyVersion(); + baselineTopologyUpdater.triggerBaselineUpdate(topVer); + } + }); } /** {@inheritDoc} */ @@ -1742,7 +1758,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I * @return Status of baseline auto-adjust. */ public BaselineAutoAdjustStatus baselineAutoAdjustStatus() { - return changeTopologyWatcher.getStatus(); + return baselineTopologyUpdater.getStatus(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java index af40f1b..0e27195 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineAutoAdjustScheduler.java @@ -25,7 +25,7 @@ import org.apache.ignite.lang.IgniteUuid; import static org.apache.ignite.IgniteSystemProperties.IGNITE_BASELINE_AUTO_ADJUST_LOG_INTERVAL; import static org.apache.ignite.IgniteSystemProperties.getLong; -import static org.apache.ignite.internal.processors.cluster.baseline.autoadjust.ChangeTopologyWatcher.DFLT_BASELINE_AUTO_ADJUST_LOG_INTERVAL; +import static org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineTopologyUpdater.DFLT_BASELINE_AUTO_ADJUST_LOG_INTERVAL; /** * This class able to add task of set baseline with timeout to queue. In one time only one task can be in queue. Every @@ -89,6 +89,14 @@ class BaselineAutoAdjustScheduler { } /** + * @param data Baseline data for adjust. + * @return {@code true} If baseline auto-adjust shouldn't be executed for given data. + */ + boolean isExecutionExpired(BaselineAutoAdjustData data) { + return baselineAutoAdjustExecutor.isExecutionExpired(data); + } + + /** * Timeout object of baseline auto-adjust operation. This object able executing several times: some first times for * logging of expecting auto-adjust and last time for baseline adjust. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/ChangeTopologyWatcher.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java similarity index 83% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/ChangeTopologyWatcher.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java index 25c0c73..3501bec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/ChangeTopologyWatcher.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/baseline/autoadjust/BaselineTopologyUpdater.java @@ -19,13 +19,10 @@ package org.apache.ignite.internal.processors.cluster.baseline.autoadjust; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration; import org.apache.ignite.internal.cluster.IgniteClusterImpl; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; @@ -35,9 +32,10 @@ import static org.apache.ignite.internal.processors.cluster.baseline.autoadjust. import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator; /** - * Watcher of topology changes. It initiate to set new baseline after some timeout. + * Baseline topology updater with ability to watch of topology changes. + * It initiates update to set new baseline after some timeout. */ -public class ChangeTopologyWatcher implements GridLocalEventListener { +public class BaselineTopologyUpdater { /** @see IgniteSystemProperties#IGNITE_BASELINE_AUTO_ADJUST_LOG_INTERVAL */ public static final int DFLT_BASELINE_AUTO_ADJUST_LOG_INTERVAL = 60_000; @@ -66,7 +64,7 @@ public class ChangeTopologyWatcher implements GridLocalEventListener { private final boolean isPersistenceEnabled; /** - * {@code true} if {@link ChangeTopologyWatcher} makes sense for local node or {@code false} otherwise(eg. local + * {@code true} if {@link BaselineTopologyUpdater} makes sense for local node or {@code false} otherwise(eg. local * node is client). */ private final boolean isSupportedByLocalNode; @@ -77,8 +75,8 @@ public class ChangeTopologyWatcher implements GridLocalEventListener { /** * @param ctx Context. */ - public ChangeTopologyWatcher(GridKernalContext ctx) { - this.log = ctx.log(ChangeTopologyWatcher.class); + public BaselineTopologyUpdater(GridKernalContext ctx) { + this.log = ctx.log(BaselineTopologyUpdater.class); this.cluster = ctx.cluster().get(); this.baselineConfiguration = ctx.state().baselineConfiguration(); this.exchangeManager = ctx.cache().context().exchange(); @@ -94,8 +92,11 @@ public class ChangeTopologyWatcher implements GridLocalEventListener { this.isPersistenceEnabled = CU.isPersistenceEnabled(cluster.ignite().configuration()); } - /** {@inheritDoc} */ - @Override public void onEvent(Event evt) { + /** + * Schedule update of the baseline topology + * @param topologyVersion version of topology + */ + public void triggerBaselineUpdate(long topologyVersion) { if (!isTopologyWatcherEnabled()) { synchronized (this) { lastBaselineData = NULL_BASELINE_DATA; @@ -104,22 +105,19 @@ public class ChangeTopologyWatcher implements GridLocalEventListener { return; } - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - if (discoEvt.eventNode().isClient() || discoEvt.eventNode().isDaemon()) - return; - synchronized (this) { - lastBaselineData = lastBaselineData.next(discoEvt.topologyVersion()); + lastBaselineData = lastBaselineData.next(topologyVersion); + + final BaselineAutoAdjustData baselineData = lastBaselineData; if (isLocalNodeCoordinator(discoveryMgr)) { - exchangeManager.affinityReadyFuture(new AffinityTopologyVersion(discoEvt.topologyVersion())) + exchangeManager.affinityReadyFuture(new AffinityTopologyVersion(topologyVersion)) .listen(future -> { if (future.error() != null) return; if (exchangeManager.lastFinishedFuture().hasLostPartitions()) { - log.warning("Baseline won't be changed cause the lost partitions were detected"); + log.warning("Baseline won't be changed cause lost partitions were detected"); return; } @@ -128,7 +126,7 @@ public class ChangeTopologyWatcher implements GridLocalEventListener { log.warning("Baseline auto-adjust will be executed in '" + timeout + "' ms"); - baselineAutoAdjustScheduler.schedule(lastBaselineData, timeout); + baselineAutoAdjustScheduler.schedule(baselineData, timeout); }); } @@ -150,7 +148,7 @@ public class ChangeTopologyWatcher implements GridLocalEventListener { */ public BaselineAutoAdjustStatus getStatus() { synchronized (this) { - if (lastBaselineData.isAdjusted()) + if (lastBaselineData.isAdjusted() || baselineAutoAdjustScheduler.isExecutionExpired(lastBaselineData)) return BaselineAutoAdjustStatus.notScheduled(); long timeToLastTask = baselineAutoAdjustScheduler.lastScheduledTaskTime(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java index b63377a..dff853d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cluster/BaselineAutoAdjustTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -33,6 +34,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; import org.junit.After; @@ -517,6 +519,43 @@ public class BaselineAutoAdjustTest extends GridCommonAbstractTest { } /** + * Test that node joins baseline topology after enabling auto adjust. + * + * Description: + * Start first node and set baseline auto adjust timeout to 100ms. Disable auto adjust and activate cluster. + * Start another node and wait until it joins cluster. Enable auto adjust and check that node is in + * baseline topology. + * + * @throws Exception If failed. + */ + @Test + public void testJoinBltExistingNode() throws Exception { + IgniteEx ignite0 = startGrid(0); + + ignite0.cluster().baselineAutoAdjustTimeout(100); + + ignite0.cluster().baselineAutoAdjustEnabled(false); + + ignite0.cluster().state(ClusterState.ACTIVE); + + startGrid(1); + + startClientGrid(2); + + awaitPartitionMapExchange(); + + assertEquals(3, ignite0.cluster().nodes().size()); + + assertEquals(1, ignite0.cluster().currentBaselineTopology().size()); + + ignite0.cluster().baselineAutoAdjustEnabled(true); + + assertTrue(GridTestUtils.waitForCondition(() -> { + return 2 == ignite0.cluster().currentBaselineTopology().size(); + }, 10_000)); + } + + /** * @throws Exception if failed. */ private IgniteConfiguration inMemoryConfiguration(int id) throws Exception {