IGNITE-4473 - Client should re-try connection attempt in case of concurrent network failure.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d124004d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d124004d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d124004d Branch: refs/heads/ignite-1192 Commit: d124004d8b0396a44c26f4c35c263a15880f508c Parents: 0ed4fda Author: dkarachentsev <dkarachent...@gridgain.com> Authored: Fri Mar 17 14:57:48 2017 +0300 Committer: dkarachentsev <dkarachent...@gridgain.com> Committed: Fri Mar 17 14:57:48 2017 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalGatewayImpl.java | 8 +- .../apache/ignite/internal/IgniteKernal.java | 120 +++++- .../internal/IgniteNeedReconnectException.java | 40 ++ .../discovery/GridDiscoveryManager.java | 24 ++ .../GridCachePartitionExchangeManager.java | 25 +- .../dht/GridDhtAssignmentFetchFuture.java | 14 +- .../GridDhtPartitionsExchangeFuture.java | 48 ++- .../service/GridServiceProcessor.java | 86 ++--- .../ignite/spi/discovery/tcp/ClientImpl.java | 201 ++++++++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 5 + .../spi/discovery/tcp/TcpDiscoveryImpl.java | 8 + .../spi/discovery/tcp/TcpDiscoverySpi.java | 9 + .../IgniteClientReconnectCacheTest.java | 7 +- .../ignite/internal/IgniteClientRejoinTest.java | 378 +++++++++++++++++++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 48 ++- .../IgniteClientReconnectTestSuite.java | 2 + 16 files changed, 929 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java index fe8c580..036954a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java @@ -44,7 +44,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** */ @GridToStringExclude - private IgniteFutureImpl<?> reconnectFut; + private volatile IgniteFutureImpl<?> reconnectFut; /** */ private final AtomicReference<GridKernalState> state = new AtomicReference<>(GridKernalState.STOPPED); @@ -149,6 +149,12 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** {@inheritDoc} */ @Override public GridFutureAdapter<?> onDisconnected() { + if (state.get() == GridKernalState.DISCONNECTED) { + assert reconnectFut != null; + + return (GridFutureAdapter<?>)reconnectFut.internalFuture(); + } + GridFutureAdapter<?> fut = new GridFutureAdapter<>(); reconnectFut = new IgniteFutureImpl<>(fut); http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8fda72f..25f7884 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -250,6 +250,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** Periodic starvation check interval. */ private static final long PERIODIC_STARVATION_CHECK_FREQ = 1000 * 30; + /** Force complete reconnect future. */ + private static final Object STOP_RECONNECT = new Object(); + /** */ @GridToStringExclude private GridKernalContextImpl ctx; @@ -327,6 +330,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @GridToStringExclude private final AtomicBoolean stopGuard = new AtomicBoolean(); + /** */ + private final ReconnectState reconnectState = new ReconnectState(); + /** * No-arg constructor is required by externalization. */ @@ -930,6 +936,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Notify IO manager the second so further components can send and receive messages. ctx.io().onKernalStart(); + boolean recon = false; + // Callbacks. for (GridComponent comp : ctx) { // Skip discovery manager. @@ -940,10 +948,24 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (comp instanceof GridIoManager) continue; - if (!skipDaemon(comp)) - comp.onKernalStart(); + if (!skipDaemon(comp)) { + try { + comp.onKernalStart(); + } + catch (IgniteNeedReconnectException e) { + assert ctx.discovery().reconnectSupported(); + + if (log.isDebugEnabled()) + log.debug("Failed to start node components on node start, will wait for reconnect: " + e); + + recon = true; + } + } } + if (recon) + reconnectState.waitFirstReconnect(); + // Register MBeans. registerKernalMBean(); registerLocalNodeMBean(); @@ -3274,6 +3296,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { public void onDisconnected() { Throwable err = null; + reconnectState.waitPreviousReconnect(); + GridFutureAdapter<?> reconnectFut = ctx.gateway().onDisconnected(); if (reconnectFut == null) { @@ -3282,9 +3306,18 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { return; } - IgniteFuture<?> userFut = new IgniteFutureImpl<>(reconnectFut); + IgniteFutureImpl<?> curFut = (IgniteFutureImpl<?>)ctx.cluster().get().clientReconnectFuture(); + + IgniteFuture<?> userFut; - ctx.cluster().get().clientReconnectFuture(userFut); + // In case of previous reconnect did not finish keep reconnect future. + if (curFut != null && curFut.internalFuture() == reconnectFut) + userFut = curFut; + else { + userFut = new IgniteFutureImpl<>(reconnectFut); + + ctx.cluster().get().clientReconnectFuture(userFut); + } ctx.disconnected(true); @@ -3337,30 +3370,53 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { ctx.disconnected(false); - GridCompoundFuture<?, ?> reconnectFut = new GridCompoundFuture<>(); + GridCompoundFuture curReconnectFut = reconnectState.curReconnectFut = new GridCompoundFuture<>(); + + reconnectState.reconnectDone = new GridFutureAdapter<>(); for (GridComponent comp : ctx.components()) { IgniteInternalFuture<?> fut = comp.onReconnected(clusterRestarted); if (fut != null) - reconnectFut.add((IgniteInternalFuture)fut); + curReconnectFut.add(fut); } - reconnectFut.add((IgniteInternalFuture)ctx.cache().context().exchange().reconnectExchangeFuture()); + curReconnectFut.add(ctx.cache().context().exchange().reconnectExchangeFuture()); + + curReconnectFut.markInitialized(); - reconnectFut.markInitialized(); + final GridFutureAdapter reconnectDone = reconnectState.reconnectDone; - reconnectFut.listen(new CI1<IgniteInternalFuture<?>>() { + curReconnectFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { try { - fut.get(); + Object res = fut.get(); + + if (res == STOP_RECONNECT) + return; ctx.gateway().onReconnected(); + + reconnectState.firstReconnectFut.onDone(); } catch (IgniteCheckedException e) { - U.error(log, "Failed to reconnect, will stop node", e); + if (!X.hasCause(e, IgniteNeedReconnectException.class, + IgniteClientDisconnectedCheckedException.class)) { + U.error(log, "Failed to reconnect, will stop node.", e); + + reconnectState.firstReconnectFut.onDone(e); - close(); + close(); + } + else { + assert ctx.discovery().reconnectSupported(); + + U.error(log, "Failed to finish reconnect, will retry [locNodeId=" + ctx.localNodeId() + + ", err=" + e.getMessage() + ']'); + } + } + finally { + reconnectDone.onDone(); } } }); @@ -3519,6 +3575,46 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } + /** + * + */ + private class ReconnectState { + /** */ + private final GridFutureAdapter firstReconnectFut = new GridFutureAdapter(); + + /** */ + private GridCompoundFuture<?, Object> curReconnectFut; + + /** */ + private GridFutureAdapter<?> reconnectDone; + + /** + * @throws IgniteCheckedException If failed. + */ + void waitFirstReconnect() throws IgniteCheckedException { + firstReconnectFut.get(); + } + + /** + * + */ + void waitPreviousReconnect() { + if (curReconnectFut != null && !curReconnectFut.isDone()) { + assert reconnectDone != null; + + curReconnectFut.onDone(STOP_RECONNECT); + + try { + reconnectDone.get(); + } + catch (IgniteCheckedException ignote) { + // No-op. + } + } + + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteKernal.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java new file mode 100644 index 0000000..61ab576 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNeedReconnectException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.jetbrains.annotations.Nullable; + +/** + * Indicates that node should try reconnect to cluster. + */ +public class IgniteNeedReconnectException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param locNode Local node. + * @param cause Cause. + */ + public IgniteNeedReconnectException(ClusterNode locNode, @Nullable Throwable cause) { + super("Local node need try to reconnect [locNodeId=" + locNode.id() + ']', cause); + + assert locNode.isClient(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 9aa4db1..2ec1070 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -112,6 +112,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -1891,6 +1892,29 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @return {@code True} if local node client and discovery SPI supports reconnect. + */ + public boolean reconnectSupported() { + DiscoverySpi spi = getSpi(); + + return ctx.clientNode() && (spi instanceof TcpDiscoverySpi) && + !(((TcpDiscoverySpi) spi).isClientReconnectDisabled()); + } + + /** + * Leave cluster and try to join again. + * + * @throws IgniteSpiException If failed. + */ + public void reconnect() { + assert reconnectSupported(); + + DiscoverySpi discoverySpi = getSpi(); + + ((TcpDiscoverySpi)discoverySpi).reconnect(); + } + + /** * Updates topology version if current version is smaller than updated. * * @param updated Updated topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 7f11dc4..92142c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; @@ -447,6 +448,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else U.warn(log, "Still waiting for initial partition map exchange [fut=" + fut + ']'); } + catch (IgniteNeedReconnectException e) { + throw e; + } + catch (Exception e) { + if (fut.reconnectOnError(e)) + throw new IgniteNeedReconnectException(cctx.localNode(), e); + + throw e; + } } for (GridCacheContext cacheCtx : cctx.cacheContexts()) { @@ -1690,6 +1700,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana dumpedObjects++; } } + catch (Exception e) { + if (exchFut.reconnectOnError(e)) + throw new IgniteNeedReconnectException(cctx.localNode(), e); + + throw e; + } } @@ -1829,7 +1845,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana catch (IgniteInterruptedCheckedException e) { throw e; } - catch (IgniteClientDisconnectedCheckedException e) { + catch (IgniteClientDisconnectedCheckedException | IgniteNeedReconnectException e) { + assert cctx.discovery().reconnectSupported(); + + U.warn(log,"Local node failed to complete partition map exchange due to " + + "network issues, will try to reconnect to cluster", e); + + cctx.discovery().reconnect(); + return; } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index ab8e863..6425bc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -17,15 +17,16 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.Queue; import java.util.UUID; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -202,8 +204,14 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin "continue to another node): " + node); } catch (IgniteCheckedException e) { - U.error(log0, "Failed to request affinity assignment from remote node (will " + - "continue to another node): " + node, e); + if (ctx.discovery().reconnectSupported() && X.hasCause(e, IOException.class)) { + onDone(new IgniteNeedReconnectException(ctx.localNode(), e)); + + return; + } + + U.warn(log0, "Failed to request affinity assignment from remote node (will " + + "continue to another node): " + node); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e945de9..d4f95e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; @@ -39,6 +41,7 @@ import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -54,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -65,7 +67,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -506,10 +508,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT throw e; } + catch (IgniteNeedReconnectException e) { + onDone(e); + } catch (Throwable e) { - U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e); + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else { + U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e); - onDone(e); + onDone(e); + } if (e instanceof Error) throw (Error)e; @@ -1297,7 +1306,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } catch (IgniteCheckedException e) { - onDone(e); + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + onDone(e); } } @@ -1314,8 +1326,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } catch (IgniteCheckedException e) { if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) { - log.debug("Failed to send full partition map to node, node left grid " + - "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); + if (log.isDebugEnabled()) + log.debug("Failed to send full partition map to node, node left grid " + + "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); + + return; + } + + if (reconnectOnError(e)) { + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); return; } @@ -1641,6 +1660,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } } + catch (Exception e) { + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + throw e; + } finally { leaveBusy(); } @@ -1652,6 +1677,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + /** + * @param e Exception. + * @return {@code True} if local node should try reconnect in case of error. + */ + public boolean reconnectOnError(Throwable e) { + return X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class) && + cctx.discovery().reconnectSupported(); + } + /** {@inheritDoc} */ @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) { return exchId.compareTo(fut.exchId); http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 6bcfd65..bd81518 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -1498,60 +1498,60 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } - /** - * Deployment callback. - * - * @param dep Service deployment. - * @param topVer Topology version. - */ - private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) { - // Retry forever. - try { - AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); + /** + * Deployment callback. + * + * @param dep Service deployment. + * @param topVer Topology version. + */ + private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) { + // Retry forever. + try { + AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); - // If topology version changed, reassignment will happen from topology event. - if (newTopVer.equals(topVer)) - reassign(dep, topVer); - } - catch (IgniteCheckedException e) { - if (!(e instanceof ClusterTopologyCheckedException)) - log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); + // If topology version changed, reassignment will happen from topology event. + if (newTopVer.equals(topVer)) + reassign(dep, topVer); + } + catch (IgniteCheckedException e) { + if (!(e instanceof ClusterTopologyCheckedException)) + log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); - AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); + AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); - if (!newTopVer.equals(topVer)) { - assert newTopVer.compareTo(topVer) > 0; + if (!newTopVer.equals(topVer)) { + assert newTopVer.compareTo(topVer) > 0; - // Reassignment will happen from topology event. - return; - } + // Reassignment will happen from topology event. + return; + } - ctx.timeout().addTimeoutObject(new GridTimeoutObject() { - private IgniteUuid id = IgniteUuid.randomUuid(); + ctx.timeout().addTimeoutObject(new GridTimeoutObject() { + private IgniteUuid id = IgniteUuid.randomUuid(); - private long start = System.currentTimeMillis(); + private long start = System.currentTimeMillis(); - @Override public IgniteUuid timeoutId() { - return id; - } + @Override public IgniteUuid timeoutId() { + return id; + } - @Override public long endTime() { - return start + RETRY_TIMEOUT; - } + @Override public long endTime() { + return start + RETRY_TIMEOUT; + } - @Override public void onTimeout() { - if (!busyLock.enterBusy()) - return; + @Override public void onTimeout() { + if (!busyLock.enterBusy()) + return; - try { - // Try again. - onDeployment(dep, topVer); - } - finally { - busyLock.leaveBusy(); - } + try { + // Try again. + onDeployment(dep, topVer); } - }); + finally { + busyLock.leaveBusy(); + } + } + }); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 95e2cda..02ba56a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -129,6 +129,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private static final Object SPI_RECONNECT_FAILED = "SPI_RECONNECT_FAILED"; + /** */ + private static final Object SPI_RECONNECT = "SPI_RECONNECT"; + /** Remote nodes. */ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); @@ -809,6 +812,11 @@ class ClientImpl extends TcpDiscoveryImpl { } /** {@inheritDoc} */ + @Override public void reconnect() throws IgniteSpiException { + msgWorker.addMessage(SPI_RECONNECT); + } + + /** {@inheritDoc} */ @Override public void brakeConnection() { SocketStream sockStream = msgWorker.currSock; @@ -879,9 +887,12 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private UUID rmtNodeId; + /** */ + private CountDownLatch stopReadLatch; + /** */ - protected SocketReader() { + SocketReader() { super(spi.ignite().name(), "tcp-client-disco-sock-reader", log); } @@ -889,7 +900,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param sockStream Socket. * @param rmtNodeId Rmt node id. */ - public void setSocket(SocketStream sockStream, UUID rmtNodeId) { + void setSocket(SocketStream sockStream, UUID rmtNodeId) { synchronized (mux) { this.sockStream = sockStream; @@ -899,6 +910,31 @@ class ClientImpl extends TcpDiscoveryImpl { } } + /** + * @throws InterruptedException If interrupted. + */ + private void forceStopRead() throws InterruptedException { + CountDownLatch stopReadLatch; + + synchronized (mux) { + SocketStream stream = sockStream; + + if (stream == null) + return; + + this.stopReadLatch = stopReadLatch = new CountDownLatch(1); + + U.closeQuiet(stream.socket()); + + this.sockStream = null; + this.rmtNodeId = null; + + mux.notifyAll(); + } + + stopReadLatch.await(); + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { while (!isInterrupted()) { @@ -906,6 +942,12 @@ class ClientImpl extends TcpDiscoveryImpl { UUID rmtNodeId; synchronized (mux) { + if (stopReadLatch != null) { + stopReadLatch.countDown(); + + stopReadLatch = null; + } + if (this.sockStream == null) { mux.wait(); @@ -1007,18 +1049,21 @@ class ClientImpl extends TcpDiscoveryImpl { private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>(); /** */ - private final long socketTimeout; + private final long sockTimeout; /** */ private TcpDiscoveryAbstractMessage unackedMsg; + /** */ + private CountDownLatch forceLeaveLatch; + /** * */ - protected SocketWriter() { + SocketWriter() { super(spi.ignite().name(), "tcp-client-disco-sock-writer", log); - socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : + sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : spi.getSocketTimeout(); } @@ -1034,6 +1079,29 @@ class ClientImpl extends TcpDiscoveryImpl { } /** + * Sends {@link TcpDiscoveryNodeLeftMessage} and closes socket. + * + * @throws InterruptedException If interrupted. + */ + private void forceLeave() throws InterruptedException { + CountDownLatch forceLeaveLatch; + + synchronized (mux) { + // If writer was stopped. + if (sock == null) + return; + + this.forceLeaveLatch = forceLeaveLatch = new CountDownLatch(1); + + unackedMsg = null; + + mux.notifyAll(); + } + + forceLeaveLatch.await(); + } + + /** * @param sock Socket. * @param clientAck {@code True} is server supports client message acknowlede. */ @@ -1089,13 +1157,41 @@ class ClientImpl extends TcpDiscoveryImpl { continue; } - msg = queue.poll(); + if (forceLeaveLatch != null) { + msg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); - if (msg == null) { - mux.wait(); + msg.client(true); + + try { + spi.writeToSocket( + sock, + msg, + sockTimeout); + } + catch (IOException | IgniteCheckedException e) { + if (log.isDebugEnabled()) { + log.debug("Failed to send TcpDiscoveryNodeLeftMessage on force leave [msg=" + msg + + ", err=" + e.getMessage() + ']'); + } + } + + U.closeQuiet(sock); + + this.sock = null; + + clear(); continue; } + else { + msg = queue.poll(); + + if (msg == null) { + mux.wait(); + + continue; + } + } } for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sndMsgLsnrs) @@ -1115,7 +1211,7 @@ class ClientImpl extends TcpDiscoveryImpl { spi.writeToSocket( sock, msg, - socketTimeout); + sockTimeout); msg = null; @@ -1165,10 +1261,30 @@ class ClientImpl extends TcpDiscoveryImpl { synchronized (mux) { if (sock == this.sock) this.sock = null; // Connection has dead. + + clear(); } } } } + + /** + * + */ + private void clear() { + assert Thread.holdsLock(mux); + + queue.clear(); + unackedMsg = null; + + CountDownLatch forceLeaveLatch = this.forceLeaveLatch; + + if (forceLeaveLatch != null) { + this.forceLeaveLatch = null; + + forceLeaveLatch.countDown(); + } + } } /** @@ -1413,6 +1529,38 @@ class ClientImpl extends TcpDiscoveryImpl { else leaveLatch.countDown(); } + else if (msg == SPI_RECONNECT) { + if (state == CONNECTED) { + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); + + reconnector = null; + } + + sockWriter.forceLeave(); + sockReader.forceStopRead(); + + currSock = null; + + queue.clear(); + + onDisconnected(); + + notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); + + UUID newId = UUID.randomUUID(); + + U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + + "to network problems [newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode+ ']'); + + locNode.onClientDisconnected(newId); + + tryJoin(); + } + } else if (msg instanceof TcpDiscoveryNodeFailedMessage && ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; @@ -1495,20 +1643,7 @@ class ClientImpl extends TcpDiscoveryImpl { ", failMsg=" + forceFailMsg + ']'); } - state = DISCONNECTED; - - nodeAdded = false; - - IgniteClientDisconnectedCheckedException err = - new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + - "client node disconnected."); - - for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) { - GridFutureAdapter<Boolean> fut = e.getValue(); - - if (pingFuts.remove(e.getKey(), fut)) - fut.onDone(err); - } + onDisconnected(); notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); } @@ -1604,6 +1739,26 @@ class ClientImpl extends TcpDiscoveryImpl { } /** + * + */ + private void onDisconnected() { + state = DISCONNECTED; + + nodeAdded = false; + + IgniteClientDisconnectedCheckedException err = + new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + + "client node disconnected."); + + for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) { + GridFutureAdapter<Boolean> fut = e.getValue(); + + if (pingFuts.remove(e.getKey(), fut)) + fut.onDone(err); + } + } + + /** * @throws InterruptedException If interrupted. */ private void tryJoin() throws InterruptedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 4600be0..afd1c2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1610,6 +1610,11 @@ class ServerImpl extends TcpDiscoveryImpl { } /** {@inheritDoc} */ + @Override public void reconnect() throws IgniteSpiException { + throw new UnsupportedOperationException("Reconnect is not supported for server."); + } + + /** {@inheritDoc} */ @Override protected IgniteSpiThread workerThread() { return msgWorker; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index f199c20..84c2ff2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; @@ -259,6 +260,13 @@ abstract class TcpDiscoveryImpl { } /** + * Leave cluster and try to join again. + * + * @throws IgniteSpiException If failed. + */ + public abstract void reconnect() throws IgniteSpiException; + + /** * <strong>FOR TEST ONLY!!!</strong> * <p> * Simulates this node failure by stopping service threads. So, node will become http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 00ae97d..a2a47fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1927,6 +1927,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } /** + * Force reconnect to cluster. + * + * @throws IgniteSpiException If failed. + */ + public void reconnect() throws IgniteSpiException { + impl.reconnect(); + } + + /** * <strong>FOR TEST ONLY!!!</strong> */ public int clientWorkerCount() { http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 0f0165b..6cdf465 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -700,9 +700,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac try { Ignition.start(optimize(getConfiguration(getTestGridName(SRV_CNT)))); - fail(); + // Commented due to IGNITE-4473, because + // IgniteClientDisconnectedException won't + // be thrown, but client will reconnect. +// fail(); - return false; + return true; } catch (IgniteClientDisconnectedException e) { log.info("Expected start error: " + e); http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java new file mode 100644 index 0000000..a5d42e9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.IgniteSpiOperationTimeoutException; +import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests client to be able restore connection to cluster if coordination is not available. + */ +public class IgniteClientRejoinTest extends GridCommonAbstractTest { + /** Block. */ + private volatile boolean block; + + /** Block all. */ + private volatile boolean blockAll; + + /** Coordinator. */ + private volatile ClusterNode crd; + + /** Client reconnect disabled. */ + private boolean clientReconnectDisabled; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK", "true"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + System.clearProperty("IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK"); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + clientReconnectDisabled = false; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (gridName.contains("client")) { + cfg.setCommunicationSpi(new TcpCommunicationSpi()); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + DiscoverySpi dspi = new DiscoverySpi(); + + dspi.setIpFinder(spi.getIpFinder()); + + cfg.setDiscoverySpi(dspi); + + dspi.setJoinTimeout(60_000); + dspi.setClientReconnectDisabled(clientReconnectDisabled); + + cfg.setClientMode(true); + } + + // TODO: IGNITE-4833 + cfg.setPeerClassLoadingEnabled(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testClientsReconnectAfterStart() throws Exception { + Ignite srv1 = startGrid("server1"); + + crd = ((IgniteKernal)srv1).localNode(); + + Ignite srv2 = startGrid("server2"); + + final CountDownLatch latch = new CountDownLatch(1); + + List<Ignite> clientNodes = new ArrayList<>(); + + final int CLIENTS_NUM = 5; + + for (int i = 0; i < CLIENTS_NUM; i++) + clientNodes.add(startGrid("client" + i)); + + blockAll = true; + + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + U.sleep(5_000); + + block = true; + blockAll = false; + + System.out.println(">>> Allow with blocked coordinator."); + + latch.countDown(); + + return null; + } + }); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + latch.await(); + + U.sleep((new Random().nextInt(15) + 30) * 1000); + + block = false; + + System.out.println(">>> Allow coordinator."); + + return null; + } + }); + + fut.get(); + + for (Ignite client : clientNodes) { + while (true) { + try { + IgniteCache<Integer, Integer> cache = client.getOrCreateCache("some"); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + for (int i = 0; i < 100; i++) + assertEquals((Integer)i, cache.get(i)); + + cache.clear(); + + break; + } + catch (IgniteClientDisconnectedException e) { + e.reconnectFuture().get(); + } + } + } + + assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size()); + assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size()); + } + + /** + * @throws Exception If failed. + */ + public void testClientsReconnect() throws Exception { + Ignite srv1 = startGrid("server1"); + + crd = ((IgniteKernal)srv1).localNode(); + + Ignite srv2 = startGrid("server2"); + + block = true; + + List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + final int CLIENTS_NUM = 5; + + for (int i = 0; i < CLIENTS_NUM; i++) { + final int idx = i; + + IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { + @Override public Ignite call() throws Exception { + latch.await(); + + return startGrid("client" + idx); + } + }); + + futs.add(fut); + } + + GridTestUtils.runAsync(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + latch.countDown(); + + Random rnd = new Random(); + + U.sleep((rnd.nextInt(15) + 15) * 1000); + + block = false; + + System.out.println(">>> ALLOW connection to coordinator."); + + return true; + } + }); + + for (IgniteInternalFuture<Ignite> clientFut : futs) { + Ignite client = clientFut.get(); + + IgniteCache<Integer, Integer> cache = client.getOrCreateCache(client.name()); + + for (int i = 0; i < 100; i++) + cache.put(i, i); + + for (int i = 0; i < 100; i++) + assert i == cache.get(i); + } + + assertEquals(CLIENTS_NUM, srv1.cluster().forClients().nodes().size()); + assertEquals(CLIENTS_NUM, srv2.cluster().forClients().nodes().size()); + } + + /** + * @throws Exception If failed. + */ + public void testClientsReconnectDisabled() throws Exception { + clientReconnectDisabled = true; + + Ignite srv1 = startGrid("server1"); + + crd = ((IgniteKernal)srv1).localNode(); + + Ignite srv2 = startGrid("server2"); + + block = true; + + List<IgniteInternalFuture<Ignite>> futs = new ArrayList<>(); + + final CountDownLatch latch = new CountDownLatch(1); + + final int CLIENTS_NUM = 5; + + for (int i = 0; i < CLIENTS_NUM; i++) { + final int idx = i; + + IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { + @Override public Ignite call() throws Exception { + latch.await(); + + return startGrid("client" + idx); + } + }); + + futs.add(fut); + } + + latch.countDown(); + + for (final IgniteInternalFuture<Ignite> clientFut : futs) { + //noinspection ThrowableNotThrown + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientFut.get(); + + return null; + } + }, IgniteCheckedException.class, null); + } + + assertEquals(0, srv1.cluster().forClients().nodes().size()); + assertEquals(0, srv2.cluster().forClients().nodes().size()); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60_000; + } + + /** + * + */ + private class TcpCommunicationSpi extends org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (blockAll || block && node.id().equals(crd.id())) + throw new IgniteSpiException(new SocketException("Test communication exception")); + + super.sendMessage(node, msg); + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + if (blockAll || block && node.id().equals(crd.id())) + throw new IgniteSpiException(new SocketException("Test communication exception")); + + super.sendMessage(node, msg, ackC); + } + } + + /** + * + */ + private class DiscoverySpi extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data, + long timeout) throws IOException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + super.writeToSocket(sock, msg, data, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + super.writeToSocket(sock, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + super.writeToSocket(sock, out, msg, timeout); + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, + long timeout) throws IOException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + super.writeToSocket(msg, sock, res, timeout); + } + + /** {@inheritDoc} */ + @Override protected Socket openSocket(Socket sock, InetSocketAddress remAddr, + IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException { + if (blockAll || block && sock.getPort() == 47500) + throw new SocketException("Test discovery exception"); + + return super.openSocket(sock, remAddr, timeoutHelper); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 331b581..0483a1c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.IgniteState; @@ -43,6 +44,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -1788,8 +1790,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientNodeIds.add(client.cluster().localNode().id()); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return srv.cluster().nodes().size() == 2; } }, awaitTime()); @@ -1800,6 +1801,49 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testForceClientReconnect() throws Exception { + startServerNodes(1); + + startClientNodes(1); + + Ignite srv = G.ignite("server-0"); + IgniteKernal client = (IgniteKernal)G.ignite("client-0"); + + UUID clientId = F.first(clientNodeIds); + + final CountDownLatch latch = new CountDownLatch(1); + + srv.events().enableLocal(EVT_NODE_JOINED); + + srv.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + latch.countDown(); + + return false; + } + }, EVT_NODE_JOINED); + + client.context().discovery().reconnect(); + + assert latch.await(10, TimeUnit.SECONDS); + + while (true) { + try { + UUID newId = client.localNode().id(); + + assert !clientId.equals(newId) : clientId; + + break; + } + catch (IgniteClientDisconnectedException e) { + e.reconnectFuture().get(10_000); + } + } + } + + /** * @param ignite Ignite. * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d124004d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java index ea8e37b..67d88e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.IgniteClientReconnectFailoverTest; import org.apache.ignite.internal.IgniteClientReconnectServicesTest; import org.apache.ignite.internal.IgniteClientReconnectStopTest; import org.apache.ignite.internal.IgniteClientReconnectStreamerTest; +import org.apache.ignite.internal.IgniteClientRejoinTest; /** * @@ -52,6 +53,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite { suite.addTestSuite(IgniteClientReconnectServicesTest.class); suite.addTestSuite(IgniteClientReconnectStreamerTest.class); suite.addTestSuite(IgniteClientReconnectFailoverTest.class); + suite.addTestSuite(IgniteClientRejoinTest.class); return suite; }