IGNITE-2766 - Opportunistically reopen cache after client reconnect - Fixes #3417
Signed-off-by: Valentin Kulichenko <valentin.kuliche...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0991437a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0991437a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0991437a Branch: refs/heads/ignite-7708 Commit: 0991437a3f4d38e68483a8bcadd3daf614b7b2dc Parents: c6ab036 Author: Ilya Kasnacheev <ilya.kasnach...@gmail.com> Authored: Fri Apr 13 14:48:10 2018 -0700 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Fri Apr 13 14:48:10 2018 -0700 ---------------------------------------------------------------------- .../cache/GatewayProtectedCacheProxy.java | 676 ++++++++----------- .../processors/cache/GridCacheGateway.java | 7 + .../processors/cache/IgniteCacheProxyImpl.java | 31 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 6 +- .../IgniteCacheQueryCacheDestroySelfTest.java | 4 + .../ClientReconnectAfterClusterRestartTest.java | 33 +- 6 files changed, 316 insertions(+), 441 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java index 27fc395..2e8120b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java @@ -36,6 +36,7 @@ import javax.cache.integration.CompletionListener; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMetrics; @@ -48,6 +49,9 @@ import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.internal.AsyncSupportAdapter; +import org.apache.ignite.internal.GridKernalState; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteClosure; @@ -138,15 +142,13 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite /** {@inheritDoc} */ @Override public GatewayProtectedCacheProxy<K, V> withExpiryPolicy(ExpiryPolicy plc) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return new GatewayProtectedCacheProxy<>(delegate, opCtx.withExpiryPolicy(plc), lock); } finally { - onLeave(gate, prev); + onLeave(opGate); } } @@ -157,9 +159,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite /** {@inheritDoc} */ @Override public GatewayProtectedCacheProxy<K, V> skipStore() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { boolean skip = opCtx.skipStore(); @@ -170,15 +170,13 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite return new GatewayProtectedCacheProxy<>(delegate, opCtx.setSkipStore(true), lock); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public GatewayProtectedCacheProxy<K, V> withNoRetries() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { boolean noRetries = opCtx.noRetries(); @@ -189,15 +187,13 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite return new GatewayProtectedCacheProxy<>(delegate, opCtx.setNoRetries(true), lock); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public GatewayProtectedCacheProxy<K, V> withPartitionRecover() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { boolean recovery = opCtx.recovery(); @@ -208,7 +204,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite return new GatewayProtectedCacheProxy<>(delegate, opCtx.setRecovery(true), lock); } finally { - onLeave(gate, prev); + onLeave(opGate); } } @@ -219,23 +215,19 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite /** {@inheritDoc} */ @Override public <K1, V1> GatewayProtectedCacheProxy<K1, V1> keepBinary() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return new GatewayProtectedCacheProxy<>((IgniteCacheProxy<K1, V1>) delegate, opCtx.keepBinary(), lock); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public GatewayProtectedCacheProxy<K, V> withDataCenterId(byte dataCenterId) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { Byte prevDataCenterId = opCtx.dataCenterId(); @@ -246,91 +238,79 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite return new GatewayProtectedCacheProxy<>(delegate, opCtx.setDataCenterId(dataCenterId), lock); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.loadCache(p, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.loadCacheAsync(p, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.localLoadCache(p, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.localLoadCacheAsync(p, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public V getAndPutIfAbsent(K key, V val) throws CacheException, TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAndPutIfAbsent(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException, TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAndPutIfAbsentAsync(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } @@ -346,1093 +326,937 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite /** {@inheritDoc} */ @Override public boolean isLocalLocked(K key, boolean byCurrThread) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.isLocalLocked(key, byCurrThread); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <R> QueryCursor<R> query(Query<R> qry) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.query(qry); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.query(qry); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public List<FieldsQueryCursor<List<?>>> queryMultipleStatements(SqlFieldsQuery qry) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.queryMultipleStatements(qry); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.query(qry, transformer); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.localEntries(peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public QueryMetrics queryMetrics() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.queryMetrics(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void resetQueryMetrics() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.resetQueryMetrics(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public Collection<? extends QueryDetailMetrics> queryDetailMetrics() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.queryDetailMetrics(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void resetQueryDetailMetrics() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.resetQueryDetailMetrics(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void localEvict(Collection<? extends K> keys) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.localEvict(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public V localPeek(K key, CachePeekMode... peekModes) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.localPeek(key, peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public int size(CachePeekMode... peekModes) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.size(peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.sizeAsync(peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public long sizeLong(CachePeekMode... peekModes) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.sizeLong(peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.sizeLongAsync(peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.sizeLong(partition, peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Long> sizeLongAsync(int partition, CachePeekMode... peekModes) throws CacheException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.sizeLongAsync(partition, peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.localSize(peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public long localSizeLong(CachePeekMode... peekModes) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.localSizeLong(peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public long localSizeLong(int partition, CachePeekMode... peekModes) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.localSizeLong(partition, peekModes); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invokeAll(map, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invokeAllAsync(map, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public V get(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.get(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<V> getAsync(K key) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAsync(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public CacheEntry<K, V> getEntry(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getEntry(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getEntryAsync(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAll(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAllAsync(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getEntries(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getEntriesAsync(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAllOutTx(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAllOutTxAsync(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public boolean containsKey(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.containsKey(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void loadAll(Set<? extends K> keys, boolean replaceExisting, CompletionListener completionListener) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.loadAll(keys, replaceExisting, completionListener); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> containsKeyAsync(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.containsKeyAsync(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public boolean containsKeys(Set<? extends K> keys) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.containsKeys(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.containsKeysAsync(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void put(K key, V val) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.put(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> putAsync(K key, V val) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.putAsync(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAndPut(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<V> getAndPutAsync(K key, V val) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAndPutAsync(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.putAll(map); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.putAllAsync(map); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.putIfAbsent(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.putIfAbsentAsync(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public boolean remove(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.remove(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> removeAsync(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.removeAsync(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.remove(key, oldVal); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.removeAsync(key, oldVal); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public V getAndRemove(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAndRemove(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<V> getAndRemoveAsync(K key) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAndRemoveAsync(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.replace(key, oldVal, newVal); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.replaceAsync(key, oldVal, newVal); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public boolean replace(K key, V val) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.replace(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Boolean> replaceAsync(K key, V val) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.replaceAsync(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAndReplace(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.getAndReplaceAsync(key, val); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.removeAll(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.removeAllAsync(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void removeAll() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.removeAll(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> removeAllAsync() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.removeAllAsync(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void clear() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.clear(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> clearAsync() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.clearAsync(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void clear(K key) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.clear(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> clearAsync(K key) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.clearAsync(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void clearAll(Set<? extends K> keys) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.clearAll(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.clearAllAsync(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void localClear(K key) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.localClear(key); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void localClearAll(Set<? extends K> keys) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.localClearAll(keys); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invoke(key, entryProcessor, arguments); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> IgniteFuture<T> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invokeAsync(key, entryProcessor, arguments); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invoke(key, entryProcessor, arguments); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> IgniteFuture<T> invokeAsync(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invokeAsync(key, entryProcessor, arguments); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invokeAll(keys, entryProcessor, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invokeAllAsync(keys, entryProcessor, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invokeAll(keys, entryProcessor, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.invokeAllAsync(keys, entryProcessor, args); } finally { - onLeave(gate, prev); + onLeave(opGate); } } @@ -1443,43 +1267,37 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite /** {@inheritDoc} */ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.registerCacheEntryListener(cacheEntryListenerConfiguration); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.deregisterCacheEntryListener(cacheEntryListenerConfiguration); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public Iterator<Entry<K, V>> iterator() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.iterator(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } @@ -1550,99 +1368,85 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite /** {@inheritDoc} */ @Override public CacheMetrics metrics() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.metrics(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public CacheMetrics metrics(ClusterGroup grp) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.metrics(grp); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public CacheMetrics localMetrics() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.localMetrics(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.mxBean(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public CacheMetricsMXBean localMxBean() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.localMxBean(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public Collection<Integer> lostPartitions() { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { return delegate.lostPartitions(); } finally { - onLeave(gate, prev); + onLeave(opGate); } } /** {@inheritDoc} */ @Override public void enableStatistics(boolean enabled) { - GridCacheGateway<K, V> gate = gate(); - - CacheOperationContext prev = onEnter(gate, opCtx); + CacheOperationGate opGate = onEnter(); try { delegate.enableStatistics(enabled); } finally { - onLeave(gate, prev); + onLeave(opGate); } } @@ -1662,26 +1466,49 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite * * @param gate Cache gateway. */ - private void checkProxyIsValid(@Nullable GridCacheGateway<K, V> gate) { + private GridCacheGateway<K, V> checkProxyIsValid(@Nullable GridCacheGateway<K, V> gate, boolean tryRestart) { if (isProxyClosed()) throw new IllegalStateException("Cache has been closed: " + context().name()); - if (delegate instanceof IgniteCacheProxyImpl) + boolean isCacheProxy = delegate instanceof IgniteCacheProxyImpl; + + if (isCacheProxy) ((IgniteCacheProxyImpl) delegate).checkRestart(); if (gate == null) throw new IllegalStateException("Gateway is unavailable. Probably cache has been destroyed, but proxy is not closed."); + + if (isCacheProxy && tryRestart && gate.isStopped() && + context().kernalContext().gateway().getState() == GridKernalState.STARTED) { + IgniteCacheProxyImpl proxyImpl = (IgniteCacheProxyImpl) delegate; + + try { + IgniteInternalCache<K, V> cache = context().kernalContext().cache().<K, V>publicJCache(context().name()).internalProxy(); + + GridFutureAdapter<Void> fut = proxyImpl.opportunisticRestart(); + + if (fut == null) + proxyImpl.onRestarted(cache.context(), cache.context().cache()); + else + new IgniteFutureImpl<>(fut).get(); + + return gate(); + } catch (IgniteCheckedException ice) { + // Opportunity didn't work out. + } + } + + return gate; } /** - * @param gate Cache gateway. - * @param opCtx Cache operation context to guard. * @return Previous projection set on this thread. */ - private CacheOperationContext onEnter(@Nullable GridCacheGateway<K, V> gate, CacheOperationContext opCtx) { - checkProxyIsValid(gate); + private CacheOperationGate onEnter() { + GridCacheGateway<K, V> gate = checkProxyIsValid(gate(), true); - return lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx); + return new CacheOperationGate(gate, + lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx)); } /** @@ -1690,7 +1517,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite */ private boolean onEnterIfNoStop(@Nullable GridCacheGateway<K, V> gate) { try { - checkProxyIsValid(gate); + checkProxyIsValid(gate, false); } catch (Exception e) { return false; @@ -1700,14 +1527,13 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite } /** - * @param gate Cache gateway. - * @param opCtx Operation context to guard. + * @param opGate Operation context to guard. */ - private void onLeave(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) { + private void onLeave(CacheOperationGate opGate) { if (lock) - gate.leave(opCtx); + opGate.gate.leave(opGate.prev); else - gate.leaveNoLock(opCtx); + opGate.gate.leaveNoLock(opGate.prev); } /** @@ -1774,4 +1600,28 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite @Override public int hashCode() { return delegate.hashCode(); } + + /** + * Holder for gate being entered and operation context to restore. + */ + private class CacheOperationGate { + /** + * Gate being entered in this operation. + */ + public final GridCacheGateway<K, V> gate; + + /** + * Operation context to restore after current operation completes. + */ + public final CacheOperationContext prev; + + /** + * @param gate Gate being entered in this operation. + * @param prev Operation context to restore after current operation completes. + */ + public CacheOperationGate(GridCacheGateway<K, V> gate, CacheOperationContext prev) { + this.gate = gate; + this.prev = prev; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index b9a4b25..658ca2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -256,6 +256,13 @@ public class GridCacheGateway<K, V> { /** * */ + public boolean isStopped() { + return !checkState(false, false); + } + + /** + * + */ public void stopped() { state.set(State.STOPPED); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index be4b0db..68e5b85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -1824,8 +1824,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * Throws {@code IgniteCacheRestartingException} if proxy is restarting. */ public void checkRestart() { - if (isRestarting()) - throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut.get()), "Cache is restarting: " + + GridFutureAdapter<Void> currentFut = this.restartFut.get(); + + if (currentFut != null) + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut), "Cache is restarting: " + context().name()); } @@ -1833,13 +1835,13 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return True if proxy is restarting, false in other case. */ public boolean isRestarting() { - return restartFut != null && restartFut.get() != null; + return restartFut.get() != null; } /** * Restarts this cache proxy. */ - public void restart() { + public boolean restart() { GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>(); final GridFutureAdapter<Void> curFut = this.restartFut.get(); @@ -1855,6 +1857,27 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< curFut.onDone(); } }); + + return changed; + } + + /** + * If proxy is already being restarted, returns future to wait on, else restarts this cache proxy. + * + * @return Future to wait on, or null. + */ + public GridFutureAdapter<Void> opportunisticRestart() { + GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>(); + + while (true) { + if (this.restartFut.compareAndSet(null, restartFut)) + return null; + + GridFutureAdapter<Void> curFut = this.restartFut.get(); + + if (curFut != null) + return curFut; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 16c5d3a..b22a397 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1616,8 +1616,6 @@ class ClientImpl extends TcpDiscoveryImpl { onDisconnected(); - notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); - UUID newId = UUID.randomUUID(); U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + @@ -1716,8 +1714,6 @@ class ClientImpl extends TcpDiscoveryImpl { } onDisconnected(); - - notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); } UUID newId = UUID.randomUUID(); @@ -1820,6 +1816,8 @@ class ClientImpl extends TcpDiscoveryImpl { delayDiscoData.clear(); + notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + "client node disconnected."); http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java index dea491c..d0d392b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IgniteCacheQueryCacheDestroySelfTest.java @@ -48,6 +48,10 @@ public class IgniteCacheQueryCacheDestroySelfTest extends GridCommonAbstractTest /** */ public static final int GRID_CNT = 3; + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + /** * The main test code. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0991437a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java index 392cdc7..505d373 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.processors.cache; +import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; @@ -31,9 +33,8 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.NotNull; @@ -119,6 +120,8 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe checkTopology(2); + IgniteCache<Long, BinaryObject> cache = client.getOrCreateCache(CACHE_PARAMS).withKeepBinary(); + client.events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event event) { @@ -161,27 +164,17 @@ public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTe startGrid(0); - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - checkTopology(2); - - return true; - } catch (Exception ex) { - return false; - } - } - }, 30_000); + try { + assertNull(cache.get(1L)); + } catch (CacheException ce) { + IgniteClientDisconnectedException icde = (IgniteClientDisconnectedException)ce.getCause(); - info("Pre-insert"); + icde.reconnectFuture().get(); - streamer = client.dataStreamer("PPRB_PARAMS"); - streamer.allowOverwrite(true); - streamer.keepBinary(true); - streamer.perNodeBufferSize(10000); - streamer.perNodeParallelOperations(100); + assertNull(cache.get(1L)); + } - IgniteCache<Long, BinaryObject> cache = client.getOrCreateCache(CACHE_PARAMS).withKeepBinary(); + info("Pre-insert"); builder = client.binary().builder("PARAMS"); builder.setField("ID", 2L);