IGNITE-4740 - Fix. Service could be deployed/undeployed twice on concurrent cancel and discovery event.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fcb3e74 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fcb3e74 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fcb3e74 Branch: refs/heads/ignite-4680-sb Commit: 9fcb3e74f91c8497b7b1358cdff40950cdf5c568 Parents: c0e2df2 Author: dkarachentsev <dkarachent...@gridgain.com> Authored: Tue Feb 28 16:05:06 2017 +0300 Committer: dkarachentsev <dkarachent...@gridgain.com> Committed: Tue Feb 28 16:05:06 2017 +0300 ---------------------------------------------------------------------- .../cache/DynamicCacheChangeBatch.java | 14 ++ .../service/GridServiceProcessor.java | 49 +++--- .../GridServiceContinuousQueryRedeploy.java | 167 +++++++++++++++++++ .../testsuites/IgniteKernalSelfTestSuite.java | 2 + 4 files changed, 208 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index 4dcff9b..a250063 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -113,6 +113,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { return clientReconnect; } + /** + * @return {@code True} if request should trigger partition exchange. + */ + public boolean exchangeNeeded() { + if (reqs != null) { + for (DynamicCacheChangeRequest req : reqs) { + if (req.exchangeNeeded()) + return true; + } + } + + return false; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeBatch.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 3690f35..4eeafed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -65,10 +65,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheIteratorConverter; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.continuous.AbstractContinuousMessage; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridEmptyIterator; @@ -1468,19 +1470,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { else { String name = e.getKey().name(); - svcName.set(name); - - Collection<ServiceContextImpl> ctxs; - - synchronized (locSvcs) { - ctxs = locSvcs.remove(name); - } - - if (ctxs != null) { - synchronized (ctxs) { - cancel(ctxs, ctxs.size()); - } - } + undeploy(name); // Finish deployment futures if undeployment happened. GridFutureAdapter<?> fut = depFuts.remove(name); @@ -1586,6 +1576,12 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (!((CacheAffinityChangeMessage)msg).exchangeNeeded()) return; } + else if (msg instanceof DynamicCacheChangeBatch) { + if (!((DynamicCacheChangeBatch)msg).exchangeNeeded()) + return; + } + else + return; } else topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); @@ -1771,21 +1767,26 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } // Handle undeployment. - else { - String name = e.getKey().name(); + else + undeploy(e.getKey().name()); + } - svcName.set(name); - Collection<ServiceContextImpl> ctxs; + /** + * @param name Name. + */ + private void undeploy(String name) { + svcName.set(name); - synchronized (locSvcs) { - ctxs = locSvcs.remove(name); - } + Collection<ServiceContextImpl> ctxs; - if (ctxs != null) { - synchronized (ctxs) { - cancel(ctxs, ctxs.size()); - } + synchronized (locSvcs) { + ctxs = locSvcs.remove(name); + } + + if (ctxs != null) { + synchronized (ctxs) { + cancel(ctxs, ctxs.size()); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java new file mode 100644 index 0000000..1a9ef3a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceConfiguration; +import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests whether concurrent service cancel and registering ContinuousQuery doesn't causes + * service redeployment. + */ +public class GridServiceContinuousQueryRedeploy extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "TEST_CACHE"; + + /** */ + private static final String TEST_KEY = "TEST_KEY"; + + /** */ + private static final String SERVICE_NAME = "service1"; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testServiceRedeploymentAfterCancel() throws Exception { + final Ignite ignite = startGrid(0); + + final IgniteCache<Object, Object> managementCache = ignite.getOrCreateCache(CACHE_NAME); + + final ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + final List<Object> evts = Collections.synchronizedList(new ArrayList<>()); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated( + Iterable<CacheEntryEvent<?, ?>> iterable) throws CacheEntryListenerException { + for (CacheEntryEvent<?, ?> event : iterable) + evts.add(event); + } + }); + + int iterations = 100; + + while (iterations-- > 0) { + QueryCursor quorumCursor = managementCache.query(qry); + + IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + System.out.println("Deploy " + SERVICE_NAME); + deployService(ignite); + + return null; + } + }); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + System.out.println("Undeploy " + SERVICE_NAME); + ignite.services().cancel(SERVICE_NAME); + + return null; + } + }); + + fut1.get(); + fut2.get(); + + U.sleep(100); + + assert evts.size() <= 1 : evts.size(); + + ignite.services().cancel("service1"); + + evts.clear(); + + quorumCursor.close(); + } + + } + + /** + * @param ignite Ignite. + */ + private void deployService(final Ignite ignite) { + ServiceConfiguration svcCfg = new ServiceConfiguration(); + + svcCfg.setService(new ManagementService()); + svcCfg.setName(SERVICE_NAME); + svcCfg.setTotalCount(1); + svcCfg.setMaxPerNodeCount(1); + svcCfg.setNodeFilter(new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return !node.isClient(); + } + }); + + ignite.services().deploy(svcCfg); + } + + /** + * + */ + public static class ManagementService implements Service { + /** */ + private final String name = UUID.randomUUID().toString(); + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void cancel(ServiceContext ctx) { + System.out.println(name + " shutdown."); + } + + /** {@inheritDoc} */ + @Override public synchronized void init(ServiceContext ctx) throws Exception { + System.out.println(name + " initializing."); + + ignite.cache(CACHE_NAME).put(TEST_KEY, name + " init"); + } + + /** {@inheritDoc} */ + @Override public void execute(ServiceContext ctx) throws Exception { + // No-op + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java index 350b715..5977702 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cluster.GridAddressResolverSelfTest import org.apache.ignite.internal.processors.cluster.GridUpdateNotifierSelfTest; import org.apache.ignite.internal.processors.port.GridPortProcessorSelfTest; import org.apache.ignite.internal.processors.service.GridServiceClientNodeTest; +import org.apache.ignite.internal.processors.service.GridServiceContinuousQueryRedeploy; import org.apache.ignite.internal.processors.service.GridServicePackagePrivateSelfTest; import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeConfigSelfTest; import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeSelfTest; @@ -143,6 +144,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite { suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class); suite.addTestSuite(IgniteServiceReassignmentTest.class); suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class); + suite.addTestSuite(GridServiceContinuousQueryRedeploy.class); suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class); suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);