ignite-3994 GridContinuousHandler cleanup on client disconnect. This closes #1496.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f4bdbb6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f4bdbb6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f4bdbb6 Branch: refs/heads/ignite-comm-balance-master Commit: 2f4bdbb674e5634ce4c1a3432dede4c865977fde Parents: 543a65f Author: vdpyatkov <[email protected]> Authored: Fri Feb 10 15:08:45 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Feb 10 15:09:53 2017 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 5 + .../internal/GridMessageListenHandler.java | 5 + .../continuous/CacheContinuousQueryHandler.java | 16 ++ .../continuous/GridContinuousHandler.java | 5 + .../continuous/GridContinuousProcessor.java | 3 + .../ClientReconnectContinuousQueryTest.java | 201 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite3.java | 2 + 7 files changed, 237 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 68d34ce..0395434 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -392,6 +392,11 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void onClientDisconnected() { + // No-op. + } + + /** {@inheritDoc} */ @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 0eeaa8a..88d4450 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -199,6 +199,11 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void onClientDisconnected() { + // No-op. + } + + /** {@inheritDoc} */ @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index a9a7d7c..b3f0684 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -854,6 +854,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; } + /** {@inheritDoc} */ + @Override public void onClientDisconnected() { + if (internal) + return; + + for (PartitionRecovery rec : rcvs.values()) + rec.resetTopologyCache(); + } + /** * @param ctx Context. * @param partId Partition id. @@ -972,6 +981,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** + * Resets cached topology. + */ + void resetTopologyCache() { + curTop = AffinityTopologyVersion.NONE; + } + + /** * Add continuous entry. * * @param cctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index f14b450..2a3a052 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -99,6 +99,11 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public GridContinuousBatch createBatch(); /** + * Client node disconnected callback. + */ + public void onClientDisconnected(); + + /** * Called when ack for a batch is received from client. * * @param routineId Routine ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 9fd9b6d..b9f42e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -912,6 +912,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { unregisterRemote(e.getKey()); } + for (LocalRoutineInfo routine : locInfos.values()) + routine.hnd.onClientDisconnected(); + rmtInfos.clear(); clientInfos.clear(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java new file mode 100644 index 0000000..feded14 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class ClientReconnectContinuousQueryTest extends GridCommonAbstractTest { + /** Client index. */ + private static final int CLIENT_IDX = 1; + + /** Puts before reconnect. */ + private static final int PUTS_BEFORE_RECONNECT = 50; + + /** Puts after reconnect. */ + private static final int PUTS_AFTER_RECONNECT = 50; + + /** Recon latch. */ + private static final CountDownLatch reconLatch = new CountDownLatch(1); + + /** Discon latch. */ + private static final CountDownLatch disconLatch = new CountDownLatch(1); + + /** Updater received. */ + private static final CountDownLatch updaterReceived = new CountDownLatch(PUTS_BEFORE_RECONNECT); + + /** Receiver after reconnect. */ + private static final CountDownLatch receiverAfterReconnect = new CountDownLatch(PUTS_AFTER_RECONNECT); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi(); + + commSpi.setSlowClientQueueLimit(50); + commSpi.setIdleConnectionTimeout(300_000); + + if (getTestGridName(CLIENT_IDX).equals(gridName)) + cfg.setClientMode(true); + else { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + cfg.setCacheConfiguration(ccfg); + } + + return cfg; + } + + /** + * Test client reconnect to alive grid. + * + * @throws Exception If failed. + */ + public void testClientReconnect() throws Exception { + try { + startGrids(2); + + IgniteEx client = grid(CLIENT_IDX); + + client.events().localListen(new DisconnectListener(), EventType.EVT_CLIENT_NODE_DISCONNECTED); + + client.events().localListen(new ReconnectListener(), EventType.EVT_CLIENT_NODE_RECONNECTED); + + IgniteCache cache = client.cache(null); + + ContinuousQuery qry = new ContinuousQuery(); + + qry.setLocalListener(new CQListener()); + + cache.query(qry); + + putSomeKeys(PUTS_BEFORE_RECONNECT); + + info("updaterReceived Count: " + updaterReceived.getCount()); + + assertTrue(updaterReceived.await(10_000, TimeUnit.MILLISECONDS)); + + skipRead(client, true); + + putSomeKeys(1_000); + + assertTrue(disconLatch.await(10_000, TimeUnit.MILLISECONDS)); + + skipRead(client, false); + + assertTrue(reconLatch.await(10_000, TimeUnit.MILLISECONDS)); + + putSomeKeys(PUTS_AFTER_RECONNECT); + + info("receiverAfterReconnect Count: " + receiverAfterReconnect.getCount()); + + assertTrue(receiverAfterReconnect.await(10_000, TimeUnit.MILLISECONDS)); + } + finally { + stopAllGrids(); + } + + } + + /** + * + */ + private static class ReconnectListener implements IgnitePredicate<Event> { + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + reconLatch.countDown(); + + return false; + } + } + + /** + * + */ + private static class DisconnectListener implements IgnitePredicate<Event> { + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + disconLatch.countDown(); + + return false; + } + } + + /** + * + */ + private static class CQListener implements CacheEntryUpdatedListener { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { + if (reconLatch.getCount() != 0) { + for (Object o : iterable) + updaterReceived.countDown(); + } + else { + for (Object o : iterable) + receiverAfterReconnect.countDown(); + } + } + } + + /** + * @param cnt Number of keys. + */ + private void putSomeKeys(int cnt) { + IgniteEx ignite = grid(0); + + IgniteCache<Object, Object> srvCache = ignite.cache(null); + + for (int i = 0; i < cnt; i++) + srvCache.put(0, i); + } + + /** + * @param igniteClient Ignite client. + * @param skip Skip. + */ + private void skipRead(IgniteEx igniteClient, boolean skip) { + GridIoManager ioMgr = igniteClient.context().io(); + + TcpCommunicationSpi commSpi = (TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0]; + + GridNioServer nioSrvr = U.field(commSpi, "nioSrvr"); + + GridTestUtils.setFieldValue(nioSrvr, "skipRead", skip); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index a865788..07125a6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest; +import org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest; import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest; @@ -123,6 +124,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class); suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class); suite.addTestSuite(IgniteCacheContinuousQueryNoUnsubscribeTest.class); + suite.addTestSuite(ClientReconnectContinuousQueryTest.class); return suite; }
