Repository: ignite Updated Branches: refs/heads/ignite-client-join-race [created] cead45f05
client join race Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cead45f0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cead45f0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cead45f0 Branch: refs/heads/ignite-client-join-race Commit: cead45f055bb01be71f1aac515f68ea5227c1bd5 Parents: ba21c46 Author: sboikov <sboi...@gridgain.com> Authored: Sat May 6 11:49:09 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Sat May 6 11:57:30 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../cache/DynamicCacheChangeBatch.java | 17 ++ .../processors/cache/GridCacheProcessor.java | 23 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 21 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 2 + .../spi/discovery/tcp/TcpDiscoverySpi.java | 4 + .../cache/distributed/CacheStartOnJoinTest.java | 226 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 2 + 8 files changed, 295 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 96930f8..7e92cf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -586,6 +586,9 @@ public final class IgniteSystemProperties { /** Cache start size for on-heap maps. Defaults to 4096. */ public static final String IGNITE_CACHE_START_SIZE = "IGNITE_CACHE_START_SIZE"; + /** */ + public static final String IGNITE_START_CACHES_ON_JOIN = "IGNITE_START_CACHES_ON_JOIN"; + /** Returns true for system properties only avoiding sending sensitive information. */ private static final IgnitePredicate<Map.Entry<String, String>> PROPS_FILTER = new IgnitePredicate<Map.Entry<String, String>>() { @Override public boolean apply(final Map.Entry<String, String> entry) { http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index a250063..66e780f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -47,6 +47,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** */ private boolean clientReconnect; + /** */ + private boolean startCaches; + /** * @param reqs Requests. */ @@ -114,6 +117,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { } /** + * @return {@code True} if required to start all caches on client node. + */ + public boolean startCaches() { + return startCaches; + } + + /** + * @param startCaches {@code True} if required to start all caches on client node. + */ + public void startCaches(boolean startCaches) { + this.startCaches = startCaches; + } + + /** * @return {@code True} if request should trigger partition exchange. */ public boolean exchangeNeeded() { http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index d6225c0..315ead9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -43,6 +43,7 @@ import javax.management.JMException; import javax.management.MBeanServer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheExistsException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; @@ -164,6 +165,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearE */ @SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"}) public class GridCacheProcessor extends GridProcessorAdapter { + /** */ + private static final boolean START_CLIENT_CACHES = + IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false); + /** Shared cache context. */ private GridCacheSharedContext<?, ?> sharedCtx; @@ -873,7 +878,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean loc = desc.locallyConfigured(); - if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { + if (loc || (desc.receivedOnDiscovery() && + (startAllCachesOnClientStart() || CU.affinityNode(locNode, filter)))) { boolean started = desc.onStart(); assert started : "Failed to change started flag for locally configured cache: " + desc; @@ -2166,6 +2172,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { batch.clientReconnect(reconnect); + if (ctx.localNodeId().equals(joiningNodeId)) + batch.startCaches(startAllCachesOnClientStart()); + // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same. batch.id(null); @@ -2244,6 +2253,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + /** + * @return {@code True} if need locally start all existing caches on client node start. + */ + private boolean startAllCachesOnClientStart() { + return START_CLIENT_CACHES && ctx.clientNode(); + } + /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { if (data.hasJoiningNodeData()) { @@ -2382,6 +2398,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); } } + + if (batch.startCaches()) { + for (Map.Entry<String, DynamicCacheDescriptor> entry : registeredCaches.entrySet()) + ctx.discovery().addClientNode(entry.getKey(), joiningNodeId, false); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index b5b4c77..4c7199c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -136,6 +136,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** Remote nodes. */ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); + /** */ + private final List<DiscoveryDataPacket> delayDiscoData = new ArrayList<>(); + /** Topology history. */ private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>(); @@ -1751,6 +1754,8 @@ class ClientImpl extends TcpDiscoveryImpl { nodeAdded = false; + delayDiscoData.clear(); + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + "client node disconnected."); @@ -1774,6 +1779,7 @@ class ClientImpl extends TcpDiscoveryImpl { joinCnt++; T2<SocketStream, Boolean> joinRes; + try { joinRes = joinTopology(false, spi.joinTimeout); } @@ -1919,8 +1925,12 @@ class ClientImpl extends TcpDiscoveryImpl { DiscoveryDataPacket dataPacket = msg.gridDiscoveryData(); - if (dataPacket != null && dataPacket.hasJoiningNodeData()) - spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration())); + if (dataPacket != null && dataPacket.hasJoiningNodeData()) { + if (joining()) + delayDiscoData.add(dataPacket); + else + spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration())); + } } } else { @@ -1944,6 +1954,13 @@ class ClientImpl extends TcpDiscoveryImpl { if (dataContainer != null) spi.onExchange(dataContainer, U.resolveClassLoader(spi.ignite().configuration())); + if (!delayDiscoData.isEmpty()) { + for (DiscoveryDataPacket data : delayDiscoData) + spi.onExchange(data, U.resolveClassLoader(spi.ignite().configuration())); + + delayDiscoData.clear(); + } + locNode.setAttributes(msg.clientNodeAttributes()); locNode.visible(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 6a10ec2..663040d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2505,6 +2505,8 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to process. */ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + spi.startMessageProcess(msg); + sendMetricsUpdateMessage(); DebugLogger log = messageLogger(msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 99a7dac..370a020 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1464,6 +1464,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { writeToSocket(sock, socketStream(sock), msg, timeout); } + protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + // No-op. + } + /** * Writes message to the socket. * http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java new file mode 100644 index 0000000..5203cf0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CyclicBarrier; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * + */ +public class CacheStartOnJoinTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Iteration. */ + private static final int ITERATIONS = 5; + + /** */ + private boolean client; + + static void doSleep(long millis) { + try { + U.sleep(1000); + } + catch (Exception e) { + throw new IgniteException(); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi testSpi = new TcpDiscoverySpi() { + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { + super.writeToSocket(sock, out, msg, timeout); + } + + private boolean delay = true; + + @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + if (getTestIgniteInstanceName(0).equals(ignite.name())) { + if (msg instanceof TcpDiscoveryJoinRequestMessage) { + TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg; + + if (msg0.client() && delay) { + log.info("Delay join processing: " + msg0); + + delay = false; + + doSleep(5000); + } + } + } + + super.startMessageProcess(msg); + } + }; + + testSpi.setIpFinder(ipFinder); + testSpi.setJoinTimeout(60_000); + + cfg.setDiscoverySpi(testSpi); + + MemoryConfiguration memCfg = new MemoryConfiguration(); + memCfg.setPageSize(1024); + memCfg.setDefaultMemoryPolicySize(50 * 1024 * 1024); + + cfg.setMemoryConfiguration(memCfg); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000L; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + System.setProperty(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, "true"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testStartNodes() throws Exception { + for (int i = 0; i < ITERATIONS; i++) { + try { + log.info("Iteration: " + (i + 1) + '/' + ITERATIONS); + + doTest(); + } + finally { + stopAllGrids(true); + } + } + } + + /** + * @throws Exception If failed. + */ + private void doTest() throws Exception { + client = false; + + final int CLIENTS = 5; + final int SRVS = 4; + + Ignite srv = startGrids(SRVS); + + srv.getOrCreateCaches(cacheConfigurations()); + + final CyclicBarrier b = new CyclicBarrier(CLIENTS); + + client = true; + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + try { + b.await(); + + startGrid(idx + SRVS); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + }, CLIENTS, "start-client"); + + final int NODES = CLIENTS + SRVS; + + for (int i = 0; i < CLIENTS + 1; i++) { + Ignite node = ignite(i); + + log.info("Check node: " + node.name()); + + assertEquals((Boolean)(i >= SRVS), node.configuration().isClientMode()); + + for (int c = 0; c < 5; c++) { + Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes(); + + assertEquals(NODES, nodes.size()); + } + + for (int c = 0; c < 5; c++) { + for (IgniteCache cache : node.getOrCreateCaches(cacheConfigurations())) { + cache.put(i, i); + + cache.get(i); + } + } + } + } + + private Collection<CacheConfiguration> cacheConfigurations() { + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + for (int i = 0; i < 5; i++) + ccfgs.add(cacheConfiguration("cache-" + i)); + + return ccfgs; + } + + private CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration ccfg = new CacheConfiguration(cacheName); + + ccfg.setName(cacheName); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); + + return ccfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 8340cd7..1023140 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarl import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; @@ -220,6 +221,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheAffinityEarlyTest.class); suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class); suite.addTestSuite(IgniteCacheCreatePutTest.class); + suite.addTestSuite(CacheStartOnJoinTest.class); suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);