IGNITE-5729 - IgniteCacheProxy instances from with() methods are not reusable after cache restart
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5172541f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5172541f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5172541f Branch: refs/heads/master Commit: 5172541f71e9878b4cc9df18580cdf1863a5820b Parents: bcbb10d Author: Pavel Kovalenko <[email protected]> Authored: Thu Jul 27 10:41:41 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Jul 27 10:41:41 2017 +0300 ---------------------------------------------------------------------- .../JettyRestProcessorAbstractSelfTest.java | 18 +- .../org/apache/ignite/cache/CacheManager.java | 4 +- .../cache/CacheAffinitySharedManager.java | 23 +- .../processors/cache/CacheOperationContext.java | 15 + .../cache/GatewayProtectedCacheProxy.java | 1754 +++++++++++ .../processors/cache/GridCacheProcessor.java | 51 +- .../processors/cache/IgniteCacheProxy.java | 2818 +----------------- .../processors/cache/IgniteCacheProxyImpl.java | 1810 +++++++++++ .../dr/IgniteDrDataStreamerCacheUpdater.java | 2 +- .../platform/cache/PlatformCache.java | 33 +- .../cache/CacheEntryProcessorCopySelfTest.java | 11 +- .../cache/CacheStopAndDestroySelfTest.java | 1 + .../cache/GridCacheAbstractSelfTest.java | 2 + .../GridCacheOnCopyFlagAbstractSelfTest.java | 6 +- ...idCacheValueConsistencyAbstractSelfTest.java | 4 +- .../IgniteCacheConfigVariationsFullApiTest.java | 6 +- .../IgniteCacheEntryListenerAbstractTest.java | 1 - .../processors/cache/IgniteCacheGroupsTest.java | 67 +- .../cache/IgniteCacheStartStopLoadTest.java | 1 - .../MemoryPolicyInitializationTest.java | 3 +- .../marshaller/GridMarshallerAbstractTest.java | 10 +- 21 files changed, 3737 insertions(+), 2903 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 97321a7..d99f278 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.rest; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -38,6 +36,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.SqlQuery; @@ -76,8 +76,8 @@ import org.apache.ignite.internal.visor.cache.VisorCacheRebalanceTask; import org.apache.ignite.internal.visor.cache.VisorCacheRebalanceTaskArg; import org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask; import org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTaskArg; -import org.apache.ignite.internal.visor.cache.VisorCacheStartTaskArg; import org.apache.ignite.internal.visor.cache.VisorCacheStartTask; +import org.apache.ignite.internal.visor.cache.VisorCacheStartTaskArg; import org.apache.ignite.internal.visor.cache.VisorCacheStopTask; import org.apache.ignite.internal.visor.cache.VisorCacheStopTaskArg; import org.apache.ignite.internal.visor.compute.VisorComputeCancelSessionsTask; @@ -87,8 +87,8 @@ import org.apache.ignite.internal.visor.compute.VisorComputeToggleMonitoringTask import org.apache.ignite.internal.visor.compute.VisorComputeToggleMonitoringTaskArg; import org.apache.ignite.internal.visor.compute.VisorGatewayTask; import org.apache.ignite.internal.visor.debug.VisorThreadDumpTask; -import org.apache.ignite.internal.visor.file.VisorFileBlockTaskArg; import org.apache.ignite.internal.visor.file.VisorFileBlockTask; +import org.apache.ignite.internal.visor.file.VisorFileBlockTaskArg; import org.apache.ignite.internal.visor.file.VisorLatestTextFilesTask; import org.apache.ignite.internal.visor.file.VisorLatestTextFilesTaskArg; import org.apache.ignite.internal.visor.igfs.VisorIgfsFormatTask; @@ -101,8 +101,8 @@ import org.apache.ignite.internal.visor.igfs.VisorIgfsResetMetricsTask; import org.apache.ignite.internal.visor.igfs.VisorIgfsResetMetricsTaskArg; import org.apache.ignite.internal.visor.igfs.VisorIgfsSamplingStateTask; import org.apache.ignite.internal.visor.igfs.VisorIgfsSamplingStateTaskArg; -import org.apache.ignite.internal.visor.log.VisorLogSearchTaskArg; import org.apache.ignite.internal.visor.log.VisorLogSearchTask; +import org.apache.ignite.internal.visor.log.VisorLogSearchTaskArg; import org.apache.ignite.internal.visor.misc.VisorAckTask; import org.apache.ignite.internal.visor.misc.VisorAckTaskArg; import org.apache.ignite.internal.visor.misc.VisorChangeGridActiveStateTask; @@ -121,16 +121,16 @@ import org.apache.ignite.internal.visor.node.VisorNodeSuppressedErrorsTask; import org.apache.ignite.internal.visor.node.VisorNodeSuppressedErrorsTaskArg; import org.apache.ignite.internal.visor.query.VisorQueryCancelTask; import org.apache.ignite.internal.visor.query.VisorQueryCancelTaskArg; +import org.apache.ignite.internal.visor.query.VisorQueryCleanupTask; import org.apache.ignite.internal.visor.query.VisorQueryCleanupTaskArg; import org.apache.ignite.internal.visor.query.VisorQueryDetailMetricsCollectorTask; import org.apache.ignite.internal.visor.query.VisorQueryDetailMetricsCollectorTaskArg; -import org.apache.ignite.internal.visor.query.VisorQueryResetMetricsTask; -import org.apache.ignite.internal.visor.query.VisorQueryResetMetricsTaskArg; -import org.apache.ignite.internal.visor.query.VisorQueryTaskArg; -import org.apache.ignite.internal.visor.query.VisorQueryCleanupTask; import org.apache.ignite.internal.visor.query.VisorQueryNextPageTask; import org.apache.ignite.internal.visor.query.VisorQueryNextPageTaskArg; +import org.apache.ignite.internal.visor.query.VisorQueryResetMetricsTask; +import org.apache.ignite.internal.visor.query.VisorQueryResetMetricsTaskArg; import org.apache.ignite.internal.visor.query.VisorQueryTask; +import org.apache.ignite.internal.visor.query.VisorQueryTaskArg; import org.apache.ignite.internal.visor.query.VisorRunningQueriesCollectorTask; import org.apache.ignite.internal.visor.query.VisorRunningQueriesCollectorTaskArg; import org.apache.ignite.lang.IgniteBiPredicate; http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java index 351cd0d..03d1f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheManager.java @@ -43,7 +43,7 @@ import org.apache.ignite.internal.GridKernalState; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.mxbean.IgniteStandardMXBean; -import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -177,7 +177,7 @@ public class CacheManager implements javax.cache.CacheManager { if (res == null) throw new CacheException(); - ((IgniteCacheProxy<K, V>)res).setCacheManager(this); + ((GatewayProtectedCacheProxy<K, V>)res).setCacheManager(this); if (igniteCacheCfg.isManagementEnabled()) enableManagement(cacheName, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 79ab183..d251d52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -703,7 +703,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert cctx.cacheContext(cacheDesc.cacheId()) == null : "Starting cache has not null context: " + cacheDesc.cacheName(); - IgniteCacheProxy cacheProxy = cctx.cache().jcacheProxy(req.cacheName()); + IgniteCacheProxyImpl cacheProxy = (IgniteCacheProxyImpl) cctx.cache().jcacheProxy(req.cacheName()); // If it has proxy then try to start it if (cacheProxy != null) { @@ -772,28 +772,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (ExchangeActions.CacheActionData action : exchActions.cacheStopRequests()) cctx.cache().blockGateway(action.request().cacheName(), true, action.request().restart()); - for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) { + for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) cctx.exchange().clearClientTopology(action.descriptor().groupId()); - CacheGroupContext gctx = cctx.cache().cacheGroup(action.descriptor().groupId()); - - if (gctx != null) { - IgniteCheckedException ex; - - String msg = "Failed to wait for topology update, cache group is stopping."; - - // If snapshot operation in progress we must throw CacheStoppedException - // for correct cache proxy restart. For more details see - // IgniteCacheProxy.cacheException() - if (cctx.cache().context().snapshot().snapshotOperationInProgress()) - ex = new CacheStoppedException(msg); - else - ex = new IgniteCheckedException(msg); - - gctx.affinity().cancelFutures(ex); - } - } - Set<Integer> stoppedGrps = null; if (crd) { http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java index bcc0ee9..eb2b902 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java @@ -233,6 +233,21 @@ public class CacheOperationContext implements Serializable { } /** + * @param dataCenterId Data center id. + * @return Operation context. + */ + public CacheOperationContext setDataCenterId(byte dataCenterId) { + return new CacheOperationContext( + skipStore, + subjId, + keepBinary, + expiryPlc, + noRetries, + dataCenterId, + recovery); + } + + /** * @param recovery Recovery flag. * @return New instance of CacheOperationContext with recovery flag. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java new file mode 100644 index 0000000..5ba6810 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java @@ -0,0 +1,1754 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import javax.cache.CacheException; +import javax.cache.CacheManager; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Configuration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CompletionListener; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorResult; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.QueryDetailMetrics; +import org.apache.ignite.cache.query.QueryMetrics; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.internal.AsyncSupportAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.mxbean.CacheMetricsMXBean; +import org.apache.ignite.transactions.TransactionException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Cache proxy wrapper with gateway lock provided operations and possibility to change cache operation context. + */ +public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V>> + implements IgniteCacheProxy<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache proxy delegate. */ + private IgniteCacheProxy<K, V> delegate; + + /** If {@code false} does not acquire read lock on gateway enter. */ + @GridToStringExclude private boolean lock; + + /** Cache operation context. */ + private CacheOperationContext opCtx; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GatewayProtectedCacheProxy() { + } + + /** + * + * @param delegate Cache proxy delegate. + * @param opCtx Cache operation context. + * @param lock True if cache proxy should be protected with gateway lock, false in other case. + */ + public GatewayProtectedCacheProxy( + @NotNull IgniteCacheProxy<K, V> delegate, + @NotNull CacheOperationContext opCtx, + boolean lock + ) { + this.delegate = delegate; + this.opCtx = opCtx; + this.lock = lock; + } + + /** + * Sets CacheManager to delegate. + * + * @param cacheMgr Cache Manager. + */ + public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { + if (delegate instanceof IgniteCacheProxyImpl) + ((IgniteCacheProxyImpl) delegate).setCacheManager(cacheMgr); + } + + /** {@inheritDoc} */ + @Override public GridCacheContext<K, V> context() { + return delegate.context(); + } + + /** {@inheritDoc} */ + @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { + return delegate.getConfiguration(clazz); + } + + /** {@inheritDoc} */ + @Override public String getName() { + return delegate.getName(); + } + + /** {@inheritDoc} */ + @Override public CacheManager getCacheManager() { + return delegate.getCacheManager(); + } + + /** {@inheritDoc} */ + @Override public GridCacheProxyImpl<K, V> internalProxy() { + return delegate.internalProxy(); + } + + /** {@inheritDoc} */ + @Override public GatewayProtectedCacheProxy<K, V> cacheNoGate() { + return new GatewayProtectedCacheProxy<>(delegate, opCtx, false); + } + + /** {@inheritDoc} */ + @Override public GatewayProtectedCacheProxy<K, V> withExpiryPolicy(ExpiryPolicy plc) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return new GatewayProtectedCacheProxy<>(delegate, opCtx.withExpiryPolicy(plc), lock); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public GatewayProtectedCacheProxy<K, V> withSkipStore() { + return skipStore(); + } + + /** {@inheritDoc} */ + @Override public GatewayProtectedCacheProxy<K, V> skipStore() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + boolean skip = opCtx.skipStore(); + + if (skip) + return this; + + return new GatewayProtectedCacheProxy<>(delegate, opCtx.setSkipStore(true), lock); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public GatewayProtectedCacheProxy<K, V> withNoRetries() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + boolean noRetries = opCtx.noRetries(); + + if (noRetries) + return this; + + return new GatewayProtectedCacheProxy<>(delegate, opCtx.setNoRetries(true), lock); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public GatewayProtectedCacheProxy<K, V> withPartitionRecover() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + boolean recovery = opCtx.recovery(); + + if (recovery) + return this; + + return new GatewayProtectedCacheProxy<>(delegate, opCtx.setRecovery(true), lock); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <K1, V1> GatewayProtectedCacheProxy<K1, V1> withKeepBinary() { + return keepBinary(); + } + + /** {@inheritDoc} */ + @Override public <K1, V1> GatewayProtectedCacheProxy<K1, V1> keepBinary() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return new GatewayProtectedCacheProxy<>((IgniteCacheProxy<K1, V1>) delegate, opCtx.keepBinary(), lock); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public GatewayProtectedCacheProxy<K, V> withDataCenterId(byte dataCenterId) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + Byte prevDataCenterId = opCtx.dataCenterId(); + + if (prevDataCenterId != null && dataCenterId == prevDataCenterId) + return this; + + return new GatewayProtectedCacheProxy<>(delegate, opCtx.setDataCenterId(dataCenterId), lock); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.loadCache(p, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.loadCacheAsync(p, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.localLoadCache(p, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localLoadCacheAsync(p, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public V getAndPutIfAbsent(K key, V val) throws CacheException, TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAndPutIfAbsent(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException, TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAndPutIfAbsentAsync(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public Lock lock(K key) { + return delegate.lock(key); + } + + /** {@inheritDoc} */ + @Override public Lock lockAll(Collection<? extends K> keys) { + return delegate.lockAll(keys); + } + + /** {@inheritDoc} */ + @Override public boolean isLocalLocked(K key, boolean byCurrThread) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.isLocalLocked(key, byCurrThread); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <R> QueryCursor<R> query(Query<R> qry) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.query(qry); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.query(qry); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.query(qry, transformer); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localEntries(peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public QueryMetrics queryMetrics() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.queryMetrics(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void resetQueryMetrics() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.resetQueryMetrics(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public Collection<? extends QueryDetailMetrics> queryDetailMetrics() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.queryDetailMetrics(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void resetQueryDetailMetrics() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.resetQueryDetailMetrics(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void localEvict(Collection<? extends K> keys) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.localEvict(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public V localPeek(K key, CachePeekMode... peekModes) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localPeek(key, peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public int size(CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.size(peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.sizeAsync(peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public long sizeLong(CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.sizeLong(peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.sizeLongAsync(peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.sizeLong(partition, peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Long> sizeLongAsync(int partition, CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.sizeLongAsync(partition, peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public int localSize(CachePeekMode... peekModes) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localSize(peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public long localSizeLong(CachePeekMode... peekModes) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localSizeLong(peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public long localSizeLong(int partition, CachePeekMode... peekModes) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localSizeLong(partition, peekModes); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invokeAll(map, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invokeAllAsync(map, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public V get(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.get(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAsync(K key) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAsync(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public CacheEntry<K, V> getEntry(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getEntry(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getEntryAsync(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public Map<K, V> getAll(Set<? extends K> keys) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAll(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAllAsync(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getEntries(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getEntriesAsync(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAllOutTx(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAllOutTxAsync(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.containsKey(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void loadAll(Set<? extends K> keys, boolean replaceExisting, CompletionListener completionListener) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.loadAll(keys, replaceExisting, completionListener); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeyAsync(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.containsKeyAsync(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public boolean containsKeys(Set<? extends K> keys) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.containsKeys(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.containsKeysAsync(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void put(K key, V val) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.put(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> putAsync(K key, V val) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.putAsync(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public V getAndPut(K key, V val) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAndPut(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndPutAsync(K key, V val) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAndPutAsync(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void putAll(Map<? extends K, ? extends V> map) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.putAll(map); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.putAllAsync(map); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public boolean putIfAbsent(K key, V val) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.putIfAbsent(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.putIfAbsentAsync(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public boolean remove(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.remove(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> removeAsync(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.removeAsync(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public boolean remove(K key, V oldVal) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.remove(key, oldVal); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.removeAsync(key, oldVal); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public V getAndRemove(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAndRemove(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndRemoveAsync(K key) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAndRemoveAsync(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public boolean replace(K key, V oldVal, V newVal) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.replace(key, oldVal, newVal); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.replaceAsync(key, oldVal, newVal); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public boolean replace(K key, V val) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.replace(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> replaceAsync(K key, V val) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.replaceAsync(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public V getAndReplace(K key, V val) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAndReplace(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.getAndReplaceAsync(key, val); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void removeAll(Set<? extends K> keys) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.removeAll(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.removeAllAsync(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void removeAll() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.removeAll(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> removeAllAsync() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.removeAllAsync(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void clear() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.clear(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAsync() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.clearAsync(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void clear(K key) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.clear(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAsync(K key) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.clearAsync(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void clearAll(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.clearAll(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.clearAllAsync(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void localClear(K key) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.localClear(key); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void localClearAll(Set<? extends K> keys) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.localClearAll(keys); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invoke(key, entryProcessor, arguments); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invokeAsync(key, entryProcessor, arguments); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invoke(key, entryProcessor, arguments); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> invokeAsync(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invokeAsync(key, entryProcessor, arguments); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invokeAll(keys, entryProcessor, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invokeAllAsync(keys, entryProcessor, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invokeAll(keys, entryProcessor, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) throws TransactionException { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.invokeAllAsync(keys, entryProcessor, args); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public <T> T unwrap(Class<T> clazz) { + return delegate.unwrap(clazz); + } + + /** {@inheritDoc} */ + @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.registerCacheEntryListener(cacheEntryListenerConfiguration); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + delegate.deregisterCacheEntryListener(cacheEntryListenerConfiguration); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public Iterator<Entry<K, V>> iterator() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.iterator(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public void destroy() { + GridCacheGateway<K, V> gate = gate(); + + if (!onEnterIfNoStop(gate)) + return; + + IgniteFuture<?> destroyFuture; + + try { + destroyFuture = delegate.destroyAsync(); + } + finally { + onLeave(gate); + } + + if (destroyFuture != null) + destroyFuture.get(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> destroyAsync() { + return delegate.destroyAsync(); + } + + /** {@inheritDoc} */ + @Override public void close() { + GridCacheGateway<K, V> gate = gate(); + + if (!onEnterIfNoStop(gate)) + return; + + IgniteFuture<?> closeFuture; + + try { + closeFuture = closeAsync(); + } + finally { + onLeave(gate); + } + + if (closeFuture != null) + closeFuture.get(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> closeAsync() { + return delegate.closeAsync(); + } + + /** {@inheritDoc} */ + @Override public boolean isClosed() { + return delegate.isClosed(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> rebalance() { + return delegate.rebalance(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> indexReadyFuture() { + return delegate.indexReadyFuture(); + } + + /** {@inheritDoc} */ + @Override public CacheMetrics metrics() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.metrics(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public CacheMetrics metrics(ClusterGroup grp) { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.metrics(grp); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public CacheMetrics localMetrics() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localMetrics(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public CacheMetricsMXBean mxBean() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.mxBean(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public CacheMetricsMXBean localMxBean() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localMxBean(); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ + @Override public Collection<Integer> lostPartitions() { + GridCacheGateway<K, V> gate = gate(); + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.lostPartitions(); + } + finally { + onLeave(gate, prev); + } + } + + /** + * Safely get CacheGateway. + * + * @return Cache Gateway. + */ + @Nullable private GridCacheGateway<K, V> gate() { + GridCacheContext<K, V> cacheContext = delegate.context(); + return cacheContext != null ? cacheContext.gate() : null; + } + + /** + * Checks that proxy is in valid state (not closed, restarted or destroyed). + * Throws IllegalStateException or CacheRestartingException if proxy is in invalid state. + * + * @param gate Cache gateway. + */ + private void checkProxyIsValid(@Nullable GridCacheGateway<K, V> gate) { + if (isProxyClosed()) + throw new IllegalStateException("Cache has been closed: " + context().name()); + + if (delegate instanceof IgniteCacheProxyImpl) + ((IgniteCacheProxyImpl) delegate).checkRestart(); + + if (gate == null) + throw new IllegalStateException("Gateway is unavailable. Probably cache has been destroyed, but proxy is not closed."); + } + + /** + * @param gate Cache gateway. + * @param opCtx Cache operation context to guard. + * @return Previous projection set on this thread. + */ + private CacheOperationContext onEnter(@Nullable GridCacheGateway<K, V> gate, CacheOperationContext opCtx) { + checkProxyIsValid(gate); + + return lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx); + } + + /** + * @param gate Cache gateway. + * @return {@code True} if enter successful. + */ + private boolean onEnterIfNoStop(@Nullable GridCacheGateway<K, V> gate) { + try { + checkProxyIsValid(gate); + } + catch (Exception e) { + return false; + } + + return lock ? gate.enterIfNotStopped() : gate.enterIfNotStoppedNoLock(); + } + + /** + * @param gate Cache gateway. + * @param opCtx Operation context to guard. + */ + private void onLeave(GridCacheGateway<K, V> gate, CacheOperationContext opCtx) { + if (lock) + gate.leave(opCtx); + else + gate.leaveNoLock(opCtx); + } + + /** + * @param gate Cache gateway. + */ + private void onLeave(GridCacheGateway<K, V> gate) { + if (lock) + gate.leave(); + else + gate.leaveNoLock(); + } + + /** {@inheritDoc} */ + @Override public boolean isProxyClosed() { + return delegate.isProxyClosed(); + } + + /** {@inheritDoc} */ + @Override public void closeProxy() { + delegate.closeProxy(); + } + + /** {@inheritDoc} */ + @Override public V getTopologySafe(K key) { + return delegate.getTopologySafe(key); + } + + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withAsync() { + return delegate.withAsync(); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return delegate.isAsync(); + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + return delegate.future(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(delegate); + + out.writeBoolean(lock); + + out.writeObject(opCtx); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + delegate = (IgniteCacheProxy<K, V>) in.readObject(); + + lock = in.readBoolean(); + + opCtx = (CacheOperationContext) in.readObject(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object another) { + GatewayProtectedCacheProxy anotherProxy = (GatewayProtectedCacheProxy) another; + + return delegate.equals(anotherProxy.delegate); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return delegate.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 9902a92..bbd7500 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -194,7 +194,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>(); /** Map of proxies. */ - private final ConcurrentHashMap<String, IgniteCacheProxy<?, ?>> jCacheProxies; + private final ConcurrentHashMap<String, IgniteCacheProxyImpl<?, ?>> jCacheProxies; /** Caches stop sequence. */ private final Deque<String> stopSeq; @@ -948,7 +948,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ public void blockGateways() { for (IgniteCacheProxy<?, ?> proxy : jCacheProxies.values()) - proxy.gate().onStopped(); + proxy.context().gate().onStopped(); } /** {@inheritDoc} */ @@ -1805,7 +1805,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration ccfg = new CacheConfiguration(startCfg); - IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(ccfg.getName()); + IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName()); boolean proxyRestart = proxy != null && proxy.isRestarting() && !caches.containsKey(ccfg.getName()); @@ -1944,14 +1944,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ void blockGateway(String cacheName, boolean stop, boolean restart) { // Break the proxy before exchange future is done. - IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName); + IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cacheName); if (proxy != null) { if (stop) { if (restart) proxy.restart(); - proxy.gate().stopped(); + proxy.context().gate().stopped(); } else proxy.closeProxy(); @@ -1965,7 +1965,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { private void stopGateway(DynamicCacheChangeRequest req) { assert req.stop() : req; - IgniteCacheProxy<?, ?> proxy; + IgniteCacheProxyImpl<?, ?> proxy; // Break the proxy before exchange future is done. if (req.restart()) { @@ -1978,7 +1978,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { proxy = jCacheProxies.remove(req.cacheName()); if (proxy != null) - proxy.gate().onStopped(); + proxy.context().gate().onStopped(); } /** @@ -2014,7 +2014,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cacheCtx.startTopologyVersion().equals(startTopVer) ) { if (!jCacheProxies.containsKey(cacheCtx.name())) - jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxyImpl(cache.context(), cache, false)); if (cacheCtx.preloader() != null) cacheCtx.preloader().onInitialExchangeComplete(err); @@ -2079,7 +2079,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert cache != null : cctx.name(); - jCacheProxies.put(cctx.name(), new IgniteCacheProxy(cache.context(), cache, null, false)); + jCacheProxies.put(cctx.name(), new IgniteCacheProxyImpl(cache.context(), cache, false)); } else { jCacheProxies.remove(cctx.name()); @@ -2129,6 +2129,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { } for (ExchangeActions.CacheActionData action : exchActions.cacheStopRequests()) { + CacheGroupContext gctx = cacheGrps.get(action.descriptor().groupId()); + + // Cancel all operations blocking gateway + if (gctx != null) { + final String msg = "Failed to wait for topology update, cache group is stopping."; + + // If snapshot operation in progress we must throw CacheStoppedException + // for correct cache proxy restart. For more details see + // IgniteCacheProxy.cacheException() + gctx.affinity().cancelFutures(new CacheStoppedException(msg)); + } + stopGateway(action.request()); sharedCtx.database().checkpointReadLock(); @@ -2764,7 +2776,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName); - if (proxy == null || proxy.proxyClosed()) + if (proxy == null || proxy.isProxyClosed()) return new GridFinishedFuture<>(); // No-op. checkEmptyTransactions(); @@ -3202,7 +3214,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return All configured cache instances. */ public Collection<IgniteCacheProxy<?, ?>> jcaches() { - return jCacheProxies.values(); + return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxyImpl<?, ?>, IgniteCacheProxy<?, ?>>() { + @Override public IgniteCacheProxy<?, ?> apply(IgniteCacheProxyImpl<?, ?> entry) { + return new GatewayProtectedCacheProxy<>(entry, new CacheOperationContext(), true); + } + }); } /** @@ -3226,7 +3242,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheAdapter<?, ?> cacheAdapter = caches.get(name); if (cacheAdapter != null) - proxy = new IgniteCacheProxy(cacheAdapter.context(), cacheAdapter, null, false); + proxy = new IgniteCacheProxyImpl(cacheAdapter.context(), cacheAdapter, false); } assert proxy != null : name; @@ -3301,13 +3317,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName); + // Try to start cache, there is no guarantee that cache will be instantiated. if (cache == null) { dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get(); cache = jCacheProxies.get(cacheName); } - return (IgniteCacheProxy<K, V>)cache; + return cache != null ? + new GatewayProtectedCacheProxy<>((IgniteCacheProxy<K, V>)cache, new CacheOperationContext(), true) : + null; } /** @@ -3424,7 +3443,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { public <K, V> IgniteCacheProxy<K, V> jcache(String name) { assert name != null; - IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>) jCacheProxies.get(name); if (cache == null) throw new IllegalArgumentException("Cache is not configured: " + name); @@ -3446,9 +3465,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { public Collection<IgniteCacheProxy<?, ?>> publicCaches() { Collection<IgniteCacheProxy<?, ?>> res = new ArrayList<>(jCacheProxies.size()); - for (Map.Entry<String, IgniteCacheProxy<?, ?>> entry : jCacheProxies.entrySet()) { + for (Map.Entry<String, IgniteCacheProxyImpl<?, ?>> entry : jCacheProxies.entrySet()) { if (entry.getValue().context().userCache()) - res.add(entry.getValue()); + res.add(new GatewayProtectedCacheProxy(entry.getValue(), new CacheOperationContext(), true)); } return res;
