ignite-2146 Call service node filter outside of system cache transaction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab8ba974 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab8ba974 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab8ba974 Branch: refs/heads/ignite-2100 Commit: ab8ba9746f7f8f3be88eba73cc9abfb84cd86ecc Parents: df08d3d Author: sboikov <sboi...@gridgain.com> Authored: Wed Dec 16 08:39:20 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Dec 16 08:39:20 2015 +0300 ---------------------------------------------------------------------- .../service/GridServiceProcessor.java | 30 +++- .../ServicePredicateAccessCacheTest.java | 155 +++++++++++++++++++ .../testsuites/IgniteKernalSelfTestSuite.java | 2 + 3 files changed, 180 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ab8ba974/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index ed54f00..6b05edd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -704,13 +704,33 @@ public class GridServiceProcessor extends GridProcessorAdapter { Object affKey = cfg.getAffinityKey(); while (true) { + GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer); + + Collection<ClusterNode> nodes; + + // Call node filter outside of transaction. + if (affKey == null) { + nodes = ctx.discovery().nodes(topVer); + + if (assigns.nodeFilter() != null) { + Collection<ClusterNode> nodes0 = new ArrayList<>(); + + for (ClusterNode node : nodes) { + if (assigns.nodeFilter().apply(node)) + nodes0.add(node); + } + + nodes = nodes0; + } + } + else + nodes = null; + try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { GridServiceAssignmentsKey key = new GridServiceAssignmentsKey(cfg.getName()); GridServiceAssignments oldAssigns = (GridServiceAssignments)cache.get(key); - GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer); - Map<UUID, Integer> cnts = new HashMap<>(); if (affKey != null) { @@ -723,10 +743,6 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } else { - Collection<ClusterNode> nodes = assigns.nodeFilter() == null ? - ctx.discovery().nodes(topVer) : - F.view(ctx.discovery().nodes(topVer), assigns.nodeFilter()); - if (!nodes.isEmpty()) { int size = nodes.size(); @@ -805,7 +821,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { assigns.assigns(cnts); - cache.getAndPut(key, assigns); + cache.put(key, assigns); tx.commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ab8ba974/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java new file mode 100644 index 0000000..c91d9f1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServicePredicateAccessCacheTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class ServicePredicateAccessCacheTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static CountDownLatch latch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setMarshaller(new BinaryMarshaller()); + + cfg.setPeerClassLoadingEnabled(false); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60_000; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testPredicateAccessCache() throws Exception { + final Ignite ignite0 = startGrid(0); + + ignite0.getOrCreateCache(new CacheConfiguration<String, Object>() + .setName("testCache") + .setAtomicityMode(ATOMIC) + .setCacheMode(REPLICATED) + .setWriteSynchronizationMode(FULL_SYNC) + .setAtomicWriteOrderMode(PRIMARY)); + + latch = new CountDownLatch(1); + + final ClusterGroup grp = ignite0.cluster().forPredicate(new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + System.out.println("Predicated started [thread=" + Thread.currentThread().getName() + ']'); + + latch.countDown(); + + try { + Thread.sleep(3000); + } + catch (InterruptedException ignore) { + // No-op. + } + + System.out.println("Call contains key [thread=" + Thread.currentThread().getName() + ']'); + + boolean ret = ignite0.cache("testCache").containsKey(node.id().toString()); + + System.out.println("After contains key [ret=" + ret + + ", thread=" + Thread.currentThread().getName() + ']'); + + return ret; + } + }); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + info("Start deploy service."); + + ignite0.services(grp).deployNodeSingleton("testService", new TestService()); + + info("Service deployed."); + + return null; + } + }, "deploy-thread"); + + latch.await(); + + startGrid(1); + + fut.get(); + } + + /** + * + */ + public static class TestService implements Service { + /** {@inheritDoc} */ + public void execute(ServiceContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + public void init(ServiceContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + public void cancel(ServiceContext ctx) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ab8ba974/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java index a41859e..deb49b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessorProxySe import org.apache.ignite.internal.processors.service.GridServiceProcessorSingleNodeSelfTest; import org.apache.ignite.internal.processors.service.GridServiceProcessorStopSelfTest; import org.apache.ignite.internal.processors.service.GridServiceReassignmentSelfTest; +import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest; import org.apache.ignite.internal.util.GridStartupWithSpecifiedWorkDirectorySelfTest; import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest; import org.apache.ignite.spi.communication.GridCacheMessageSelfTest; @@ -117,6 +118,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite { suite.addTestSuite(GridServiceReassignmentSelfTest.class); suite.addTestSuite(GridServiceClientNodeTest.class); suite.addTestSuite(GridServiceProcessorStopSelfTest.class); + suite.addTestSuite(ServicePredicateAccessCacheTest.class); return suite; }