This is an automated email from the ASF dual-hosted git repository. akuznetsov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new fe77a25 IGNITE-13246 New baseline event types added. - Fixes #8034. fe77a25 is described below commit fe77a25f3c7aa4d10a6c6a56550d501995624bc0 Author: ibessonov <bessonov...@gmail.com> AuthorDate: Wed Jul 15 19:47:12 2020 +0700 IGNITE-13246 New baseline event types added. - Fixes #8034. Signed-off-by: Alexey Kuznetsov <akuznet...@apache.org> --- .../ignite/events/BaselineEventsLocalTest.java | 29 +++ .../ignite/events/BaselineEventsRemoteTest.java | 29 +++ .../apache/ignite/events/BaselineEventsTest.java | 287 +++++++++++++++++++++ .../testsuites/IgniteControlUtilityTestSuite.java | 5 + .../apache/ignite/events/BaselineChangedEvent.java | 91 +++++++ .../events/BaselineConfigurationChangedEvent.java | 100 +++++++ .../java/org/apache/ignite/events/EventType.java | 44 ++++ .../cluster/DistributedBaselineConfiguration.java | 14 +- .../cache/GridCachePartitionExchangeManager.java | 41 ++- .../cluster/DiscoveryDataClusterState.java | 10 + .../cluster/GridClusterStateProcessor.java | 37 ++- .../distributed/DistributedProperty.java | 2 +- .../distributed/SimpleDistributedProperty.java | 4 +- 13 files changed, 683 insertions(+), 10 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsLocalTest.java b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsLocalTest.java new file mode 100644 index 0000000..017240d --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsLocalTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.lang.IgnitePredicate; + +/** */ +public class BaselineEventsLocalTest extends BaselineEventsTest { + /** {@inheritDoc} */ + @Override protected void listen(IgniteEx ignite, IgnitePredicate<Event> lsnr, int... types) { + ignite.events().localListen(lsnr, types); + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsRemoteTest.java b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsRemoteTest.java new file mode 100644 index 0000000..e6ac8a1 --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsRemoteTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.lang.IgnitePredicate; + +/** */ +public class BaselineEventsRemoteTest extends BaselineEventsTest { + /** {@inheritDoc} */ + @Override protected void listen(IgniteEx ignite, IgnitePredicate<Event> lsnr, int... types) { + ignite.events(ignite.cluster().forRemotes()).remoteListen((uuid, t) -> lsnr.apply(t), t -> true, types); + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java new file mode 100644 index 0000000..28b7b9e --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/events/BaselineEventsTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.commandline.CommandHandler; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** */ +public abstract class BaselineEventsTest extends GridCommonAbstractTest { + /** */ + private int[] includedEvtTypes = EventType.EVTS_ALL; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setConnectorConfiguration(new ConnectorConfiguration()) + .setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + .setWalSegments(3) + .setWalSegmentSize(512 * 1024) + ) + .setConsistentId(igniteInstanceName) + .setIncludeEventTypes(includedEvtTypes); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + protected abstract void listen(IgniteEx ignite, IgnitePredicate<Event> lsnr, int... types); + + /** */ + @Test + public void testChangeBltWithControlUtility() throws Exception { + startGrid(0).cluster().active(true); + + AtomicBoolean baselineChanged = new AtomicBoolean(); + + startGrid(1); + + String consistentIds = grid(0).localNode().consistentId() + "," + grid(1).localNode().consistentId(); + + listen( + grid(1), + event -> { + baselineChanged.set(true); + + BaselineChangedEvent baselineChangedEvt = (BaselineChangedEvent)event; + + assertEquals(2, baselineChangedEvt.baselineNodes().size()); + + return true; + }, + EventType.EVT_BASELINE_CHANGED + ); + + assertEquals( + CommandHandler.EXIT_CODE_OK, + new CommandHandler().execute(Arrays.asList("--baseline", "set", consistentIds, "--yes")) + ); + + assertTrue(GridTestUtils.waitForCondition(baselineChanged::get, 3_000)); + } + + /** */ + @Test + public void testChangeBltWithPublicApi() throws Exception { + startGrid(0).cluster().active(true); + + AtomicBoolean baselineChanged = new AtomicBoolean(); + + listen( + startGrid(1), + event -> { + baselineChanged.set(true); + + BaselineChangedEvent baselineChangedEvt = (BaselineChangedEvent)event; + + assertEquals(2, baselineChangedEvt.baselineNodes().size()); + + return true; + }, + EventType.EVT_BASELINE_CHANGED + ); + + grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion()); + + assertTrue(GridTestUtils.waitForCondition(baselineChanged::get, 3_000)); + } + + /** */ + @Test + public void testDeactivateActivate() throws Exception { + IgniteEx ignite = startGrids(2); + + AtomicBoolean baselineChanged = new AtomicBoolean(); + + listen( + ignite, + event -> { + baselineChanged.set(true); + + return true; + }, + EventType.EVT_BASELINE_CHANGED + ); + + ignite.cluster().active(true); + + assertTrue(GridTestUtils.waitForCondition(baselineChanged::get, 3_000)); + baselineChanged.set(false); + + ignite.cluster().active(false); + ignite.cluster().active(true); + + assertFalse(GridTestUtils.waitForCondition(baselineChanged::get, 3_000)); + } + + /** */ + @Test + public void testChangeAutoAdjustEnabled() throws Exception { + IgniteClusterEx cluster = startGrids(2).cluster(); + + cluster.active(true); + + assertFalse(cluster.isBaselineAutoAdjustEnabled()); + + AtomicBoolean autoAdjustEnabled = new AtomicBoolean(); + + listen( + grid(0), + event -> { + BaselineConfigurationChangedEvent bltCfgChangedEvt = (BaselineConfigurationChangedEvent)event; + + autoAdjustEnabled.set(bltCfgChangedEvt.isAutoAdjustEnabled()); + + return true; + }, + EventType.EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED + ); + + assertEquals( + CommandHandler.EXIT_CODE_OK, + new CommandHandler().execute(Arrays.asList("--baseline", "auto_adjust", "enable", "timeout", "10", "--yes")) + ); + assertTrue(GridTestUtils.waitForCondition(autoAdjustEnabled::get, 3_000)); + + assertEquals( + CommandHandler.EXIT_CODE_OK, + new CommandHandler().execute(Arrays.asList("--baseline", "auto_adjust", "disable", "--yes")) + ); + assertFalse(autoAdjustEnabled.get()); + + cluster.baselineAutoAdjustEnabled(true); + assertTrue(GridTestUtils.waitForCondition(autoAdjustEnabled::get, 3_000)); + + cluster.baselineAutoAdjustEnabled(false); + assertTrue(GridTestUtils.waitForCondition(() -> !autoAdjustEnabled.get(), 3_000)); + } + + /** */ + @Test + public void testChangeAutoAdjustTimeout() throws Exception { + IgniteClusterEx cluster = startGrids(2).cluster(); + + cluster.active(true); + + AtomicLong autoAdjustTimeout = new AtomicLong(); + + listen( + grid(0), + event -> { + BaselineConfigurationChangedEvent bltCfgChangedEvt = (BaselineConfigurationChangedEvent)event; + + autoAdjustTimeout.set(bltCfgChangedEvt.autoAdjustTimeout()); + + return true; + }, + EventType.EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED + ); + + assertEquals( + CommandHandler.EXIT_CODE_OK, + new CommandHandler().execute(Arrays.asList("--baseline", "auto_adjust", "enable", "timeout", "10", "--yes")) + ); + assertTrue(GridTestUtils.waitForCondition(() -> autoAdjustTimeout.get() == 10L, 3_000)); + + cluster.baselineAutoAdjustTimeout(50); + assertTrue(GridTestUtils.waitForCondition(() -> autoAdjustTimeout.get() == 50L, 3_000)); + } + + /** */ + @Test + public void testEventsDisabledByDefault() throws Exception { + //noinspection ZeroLengthArrayAllocation + includedEvtTypes = new int[0]; + + IgniteClusterEx cluster = startGrid(0).cluster(); + cluster.active(true); + + AtomicInteger evtsTriggered = new AtomicInteger(); + + listen( + grid(0), + event -> { + evtsTriggered.incrementAndGet(); + + return true; + }, + EventType.EVT_BASELINE_CHANGED, + EventType.EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED, + EventType.EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED + ); + + startGrid(1); + + String consistentIds = grid(0).localNode().consistentId() + "," + grid(1).localNode().consistentId(); + + assertEquals( + CommandHandler.EXIT_CODE_OK, + new CommandHandler().execute(Arrays.asList("--baseline", "set", consistentIds, "--yes")) + ); + + awaitPartitionMapExchange(); + + startGrid(2); + + cluster.setBaselineTopology(cluster.topologyVersion()); + + awaitPartitionMapExchange(); + + assertEquals( + CommandHandler.EXIT_CODE_OK, + new CommandHandler().execute(Arrays.asList("--baseline", "auto_adjust", "enable", "timeout", "10", "--yes")) + ); + + cluster.baselineAutoAdjustEnabled(false); + cluster.baselineAutoAdjustTimeout(50); + + assertFalse(GridTestUtils.waitForCondition(() -> evtsTriggered.get() > 0, 3_000L)); + } +} diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java index 7a57771..cc748e5 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java @@ -17,6 +17,8 @@ package org.apache.ignite.testsuites; +import org.apache.ignite.events.BaselineEventsLocalTest; +import org.apache.ignite.events.BaselineEventsRemoteTest; import org.apache.ignite.internal.commandline.CommandHandlerParsingTest; import org.apache.ignite.internal.processors.security.GridCommandHandlerSslWithSecurityTest; import org.apache.ignite.util.GridCommandHandlerBrokenIndexTest; @@ -61,6 +63,9 @@ import org.junit.runners.Suite; GridCommandHandlerMetadataTest.class, KillCommandsCommandShTest.class, + + BaselineEventsLocalTest.class, + BaselineEventsRemoteTest.class, }) public class IgniteControlUtilityTestSuite { } diff --git a/modules/core/src/main/java/org/apache/ignite/events/BaselineChangedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/BaselineChangedEvent.java new file mode 100644 index 0000000..bbbeb74 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/events/BaselineChangedEvent.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import java.util.Collection; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgnitePredicate; + +/** + * Baseline changed event. + * <p> + * Grid events are used for notification about what happens within the grid. Note that by + * design Ignite keeps all events generated on the local node locally and it provides + * APIs for performing a distributed queries across multiple nodes: + * <ul> + * <li> + * {@link IgniteEvents#remoteQuery(IgnitePredicate, long, int...)} - + * asynchronously querying events occurred on the nodes specified, including remote nodes. + * </li> + * <li> + * {@link IgniteEvents#localQuery(IgnitePredicate, int...)} - + * querying only local events stored on this local node. + * </li> + * <li> + * {@link IgniteEvents#localListen(IgnitePredicate, int...)} - + * listening to local grid events (events from remote nodes not included). + * </li> + * </ul> + * User can also wait for events using method {@link IgniteEvents#waitForLocal(IgnitePredicate, int...)}. + * <h1 class="header">Events and Performance</h1> + * Note that by default all events in Ignite are enabled and therefore generated and stored + * by whatever event storage SPI is configured. Ignite can and often does generate thousands events per seconds + * under the load and therefore it creates a significant additional load on the system. If these events are + * not needed by the application this load is unnecessary and leads to significant performance degradation. + * <p> + * It is <b>highly recommended</b> to enable only those events that your application logic requires + * by using {@link IgniteConfiguration#getIncludeEventTypes()} method in Ignite configuration. Note that certain + * events are required for Ignite's internal operations and such events will still be generated but not stored by + * event storage SPI if they are disabled in Ignite configuration. + * @see EventType#EVT_BASELINE_CHANGED + */ +public class BaselineChangedEvent extends EventAdapter { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Baseline nodes. */ + private final Collection<BaselineNode> baselineNodes; + + /** + * Creates baseline changed event with given parameters. + * @param node Node. + * @param msg Optional event message. + * @param type Event type. + * @param baselineNodes Collection of new baseline nodes. + */ + public BaselineChangedEvent( + ClusterNode node, + String msg, + int type, + Collection<BaselineNode> baselineNodes + ) { + super(node, msg, type); + + //noinspection AssignmentOrReturnOfFieldWithMutableType + this.baselineNodes = baselineNodes; + } + + /** New baseline nodes. */ + public Collection<BaselineNode> baselineNodes() { + //noinspection AssignmentOrReturnOfFieldWithMutableType + return baselineNodes; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/events/BaselineConfigurationChangedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/BaselineConfigurationChangedEvent.java new file mode 100644 index 0000000..c623014 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/events/BaselineConfigurationChangedEvent.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import org.apache.ignite.IgniteCluster; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgnitePredicate; + +/** + * Baseline configuration changed event. + * <p> + * Grid events are used for notification about what happens within the grid. Note that by + * design Ignite keeps all events generated on the local node locally and it provides + * APIs for performing a distributed queries across multiple nodes: + * <ul> + * <li> + * {@link IgniteEvents#remoteQuery(IgnitePredicate, long, int...)} - + * asynchronously querying events occurred on the nodes specified, including remote nodes. + * </li> + * <li> + * {@link IgniteEvents#localQuery(IgnitePredicate, int...)} - + * querying only local events stored on this local node. + * </li> + * <li> + * {@link IgniteEvents#localListen(IgnitePredicate, int...)} - + * listening to local grid events (events from remote nodes not included). + * </li> + * </ul> + * User can also wait for events using method {@link IgniteEvents#waitForLocal(IgnitePredicate, int...)}. + * <h1 class="header">Events and Performance</h1> + * Note that by default all events in Ignite are enabled and therefore generated and stored + * by whatever event storage SPI is configured. Ignite can and often does generate thousands events per seconds + * under the load and therefore it creates a significant additional load on the system. If these events are + * not needed by the application this load is unnecessary and leads to significant performance degradation. + * <p> + * It is <b>highly recommended</b> to enable only those events that your application logic requires + * by using {@link IgniteConfiguration#getIncludeEventTypes()} method in Ignite configuration. Note that certain + * events are required for Ignite's internal operations and such events will still be generated but not stored by + * event storage SPI if they are disabled in Ignite configuration. + * @see EventType#EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED + * @see EventType#EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED + */ +public class BaselineConfigurationChangedEvent extends EventAdapter { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** @see IgniteCluster#isBaselineAutoAdjustEnabled() */ + private final boolean autoAdjustEnabled; + + /** @see IgniteCluster#baselineAutoAdjustTimeout() */ + private final long autoAdjustTimeout; + + /** + * Creates baseline configuration changed event with given parameters. + * @param node Node. + * @param msg Optional event message. + * @param type Event type. + * @param autoAdjustEnabled Auto-adjust "enabled" flag value. + * @param autoAdjustTimeout Auto-adjust timeout value in milliseconds. + */ + public BaselineConfigurationChangedEvent( + ClusterNode node, + String msg, + int type, + boolean autoAdjustEnabled, + long autoAdjustTimeout + ) { + super(node, msg, type); + + this.autoAdjustEnabled = autoAdjustEnabled; + this.autoAdjustTimeout = autoAdjustTimeout; + } + + /** Auto-adjust "enabled" flag value. */ + public boolean isAutoAdjustEnabled() { + return autoAdjustEnabled; + } + + /** Auto-adjust timeout value in milliseconds. */ + public long autoAdjustTimeout() { + return autoAdjustTimeout; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index f6ba4da..fca1d02 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -17,6 +17,7 @@ package org.apache.ignite.events; +import java.util.Collection; import java.util.List; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCluster; @@ -981,6 +982,49 @@ public interface EventType { public static final int EVT_CLUSTER_STATE_CHANGE_STARTED = 145; /** + * Built-in event type: baseline topology has been changed by either user request or auto-adjust timeout event. + * Event includes the following information: new baseline nodes. + * + * <p> + * Fired when new tag is successfully set on all nodes. + * </p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see IgniteCluster#setBaselineTopology(long) + * @see IgniteCluster#setBaselineTopology(Collection) + */ + public static final int EVT_BASELINE_CHANGED = 146; + + /** + * Built-in event type: baseline auto-adjust "enabled" flag has been changed by user request. + * Event includes the following information: auto-adjust enabled flag, auto-adjust timeout. + * + * <p> + * Fired when new tag is successfully set on all nodes. + * </p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see IgniteCluster#baselineAutoAdjustEnabled(boolean) + */ + public static final int EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED = 147; + + /** + * Built-in event type: baseline auto-adjust timeout has been changed by user request. + * Event includes the following information: auto-adjust "enabled" flag, auto-adjust timeout. + * + * <p> + * Fired when new tag is successfully set on all nodes. + * </p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see IgniteCluster#baselineAutoAdjustTimeout(long) + */ + public static final int EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED = 148; + + /** * All checkpoint events. This array can be directly passed into * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to * subscribe to all checkpoint events. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java index aa0a978..855efe4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.cluster; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener; import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty; import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; @@ -76,7 +77,8 @@ public class DistributedBaselineConfiguration { public DistributedBaselineConfiguration( GridInternalSubscriptionProcessor isp, GridKernalContext ctx, - IgniteLogger log) { + IgniteLogger log + ) { this.log = log; boolean persistenceEnabled = ctx.config() != null && CU.isPersistenceEnabled(ctx.config()); @@ -101,6 +103,16 @@ public class DistributedBaselineConfiguration { ); } + /** */ + public void listenAutoAdjustEnabled(DistributePropertyListener<? super Boolean> lsnr) { + baselineAutoAdjustEnabled.addListener(lsnr); + } + + /** */ + public void listenAutoAdjustTimeout(DistributePropertyListener<? super Long> lsnr) { + baselineAutoAdjustTimeout.addListener(lsnr); + } + /** * Called when cluster performing activation. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index fc8bef2..d8f95db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -56,13 +56,17 @@ import org.apache.ignite.cluster.BaselineNode; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterGroupEmptyException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.BaselineChangedEvent; import org.apache.ignite.events.ClusterActivationEvent; import org.apache.ignite.events.ClusterStateChangeEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; @@ -111,6 +115,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.BooleanMetricImpl; import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; @@ -581,7 +586,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchFut = exchangeFuture(exchId, evt, cache, exchActions, null); - exchFut.listen(f -> onClusterStateChangeFinish(f, exchActions)); + boolean baselineChanging; + if (stateChangeMsg.forceChangeBaselineTopology()) + baselineChanging = true; + else { + DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState(); + + assert state.transition() : state; + + baselineChanging = exchActions.changedBaseline() + // Or it is the first activation. + || state.state() != ClusterState.INACTIVE && !state.previouslyActive() && state.previousBaselineTopology() == null; + } + + exchFut.listen(f -> onClusterStateChangeFinish(f, exchActions, baselineChanging)); } } else if (customMsg instanceof DynamicCacheChangeBatch) { @@ -664,7 +682,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** */ - private void onClusterStateChangeFinish(IgniteInternalFuture<AffinityTopologyVersion> fut, ExchangeActions exchActions) { + private void onClusterStateChangeFinish(IgniteInternalFuture<AffinityTopologyVersion> fut, + ExchangeActions exchActions, boolean baselineChanging) { A.notNull(exchActions, "exchActions"); GridEventStorageManager evtMngr = cctx.kernalContext().event(); @@ -711,6 +730,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.kernalContext().getSystemExecutorService() .submit(() -> evts.forEach(e -> cctx.kernalContext().event().record(e))); } + + GridKernalContext ctx = cctx.kernalContext(); + DiscoveryDataClusterState state = ctx.state().clusterState(); + + if (baselineChanging) { + ctx.getStripedExecutorService().execute(new Runnable() { + @Override public void run() { + if (ctx.event().isRecordable(EventType.EVT_BASELINE_CHANGED)) { + ctx.event().record(new BaselineChangedEvent( + ctx.discovery().localNode(), + "Baseline changed.", + EventType.EVT_BASELINE_CHANGED, + ctx.cluster().get().currentBaselineTopology() + )); + } + } + }); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java index 8536dd2..73e771e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java @@ -211,6 +211,16 @@ public class DiscoveryDataClusterState implements Serializable { } /** + * @return Previous "active" flag value during transition. + */ + public boolean previouslyActive() { + if (prevState != null) + return prevState.state != INACTIVE; + + return state == INACTIVE; + } + + /** * @return State change exchange version. */ public AffinityTopologyVersion transitionTopologyVersion() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 57b6790..888ba7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.BaselineConfigurationChangedEvent; import org.apache.ignite.events.ClusterStateChangeStartedEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -68,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadO import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineAutoAdjustStatus; import org.apache.ignite.internal.processors.cluster.baseline.autoadjust.ChangeTopologyWatcher; +import org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener; import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -95,6 +97,8 @@ import static org.apache.ignite.cluster.ClusterState.ACTIVE; import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; import static org.apache.ignite.cluster.ClusterState.INACTIVE; import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_STATE_ON_START; +import static org.apache.ignite.events.EventType.EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED; +import static org.apache.ignite.events.EventType.EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -110,9 +114,6 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersi * */ public class GridClusterStateProcessor extends GridProcessorAdapter implements IGridClusterStateProcessor, MetastorageLifecycleListener { - /** Stripe id for cluster activation event. */ - public static final int CLUSTER_ACTIVATION_EVT_STRIPE_ID = Integer.MAX_VALUE; - /** */ private static final String METASTORE_CURR_BLT_KEY = "metastoreBltKey"; @@ -202,8 +203,37 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I ctx, ctx.log(DistributedBaselineConfiguration.class) ); + + distributedBaselineConfiguration.listenAutoAdjustEnabled(makeEventListener( + EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED + )); + + distributedBaselineConfiguration.listenAutoAdjustTimeout(makeEventListener( + EVT_BASELINE_AUTO_ADJUST_AWAITING_TIME_CHANGED + )); + } + + /** */ + private DistributePropertyListener<Object> makeEventListener(int evtType) { + //noinspection CodeBlock2Expr + return (name, oldVal, newVal) -> { + ctx.getStripedExecutorService().execute(() -> { + if (ctx.event().isRecordable(evtType)) { + ctx.event().record(new BaselineConfigurationChangedEvent( + ctx.discovery().localNode(), + evtType == EVT_BASELINE_AUTO_ADJUST_ENABLED_CHANGED + ? "Baseline auto-adjust \"enabled\" flag has been changed" + : "Baseline auto-adjust timeout has been changed", + evtType, + distributedBaselineConfiguration.isBaselineAutoAdjustEnabled(), + distributedBaselineConfiguration.getBaselineAutoAdjustTimeout() + )); + } + }); + }; } + /** {@inheritDoc} */ @Override public @Nullable IgniteNodeValidationResult validateNode(ClusterNode node) { if (!isBaselineAutoAdjustEnabled() || baselineAutoAdjustTimeout() != 0) @@ -700,7 +730,6 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I if (newState.state() != state.state()) { if (ctx.event().isRecordable(EventType.EVT_CLUSTER_STATE_CHANGE_STARTED)) { ctx.getStripedExecutorService().execute( - CLUSTER_ACTIVATION_EVT_STRIPE_ID, () -> ctx.event().record(new ClusterStateChangeStartedEvent( state.state(), newState.state(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java index f15da49..b47062c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java @@ -85,5 +85,5 @@ public interface DistributedProperty<T extends Serializable> { /** * @param listener Update listener. */ - void addListener(DistributePropertyListener<T> listener); + void addListener(DistributePropertyListener<? super T> listener); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/SimpleDistributedProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/SimpleDistributedProperty.java index c549968..9819197 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/SimpleDistributedProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/SimpleDistributedProperty.java @@ -39,7 +39,7 @@ public class SimpleDistributedProperty<T extends Serializable> implements Distri private volatile boolean attached = false; /** Listeners of property update. */ - private final ConcurrentLinkedQueue<DistributePropertyListener<T>> updateListeners = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<DistributePropertyListener<? super T>> updateListeners = new ConcurrentLinkedQueue<>(); /** * Specific consumer for update value in cluster. It is null when property doesn't ready to update value on cluster @@ -108,7 +108,7 @@ public class SimpleDistributedProperty<T extends Serializable> implements Distri } /** {@inheritDoc} */ - @Override public void addListener(DistributePropertyListener<T> listener) { + @Override public void addListener(DistributePropertyListener<? super T> listener) { updateListeners.add(listener); }