http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java new file mode 100644 index 0000000..b94afa1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -0,0 +1,1810 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import javax.cache.Cache; +import javax.cache.CacheException; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Configuration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CompletionListener; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCacheRestartingException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheManager; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.QueryDetailMetrics; +import org.apache.ignite.cache.query.QueryMetrics; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SpiQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.AsyncSupportAdapter; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.query.CacheQuery; +import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.GridEmptyIterator; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.IgniteOutClosureX; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CX1; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.mxbean.CacheMetricsMXBean; +import org.apache.ignite.plugin.security.SecurityPermission; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Cache proxy implementation. + */ +@SuppressWarnings("unchecked") +public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<K, V>> + implements IgniteCacheProxy<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Context. */ + private volatile GridCacheContext<K, V> ctx; + + /** Delegate. */ + @GridToStringInclude + private volatile IgniteInternalCache<K, V> delegate; + + /** */ + @GridToStringExclude + private CacheManager cacheMgr; + + /** Future indicates that cache is under restarting. */ + private final AtomicReference<GridFutureAdapter<Void>> restartFut; + + /** Flag indicates that proxy is closed. */ + private volatile boolean closed; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public IgniteCacheProxyImpl() { + restartFut = new AtomicReference<GridFutureAdapter<Void>>(null); + } + + /** + * @param ctx Context. + * @param delegate Delegate. + * @param async Async support flag. + */ + public IgniteCacheProxyImpl( + @NotNull GridCacheContext<K, V> ctx, + @NotNull IgniteInternalCache<K, V> delegate, + boolean async + ) { + this(ctx, delegate, new AtomicReference<GridFutureAdapter<Void>>(null), async); + } + + /** + * @param ctx Context. + * @param delegate Delegate. + * @param async Async support flag. + */ + private IgniteCacheProxyImpl( + @NotNull GridCacheContext<K, V> ctx, + @NotNull IgniteInternalCache<K, V> delegate, + @NotNull AtomicReference<GridFutureAdapter<Void>> restartFut, + boolean async + ) { + super(async); + + assert ctx != null; + assert delegate != null; + + this.ctx = ctx; + this.delegate = delegate; + + this.restartFut = restartFut; + } + + /** + * @return Context. + */ + @Override + public GridCacheContext<K, V> context() { + return ctx; + } + + /** {@inheritDoc} */ + @Override public IgniteCacheProxy<K, V> cacheNoGate() { + return new GatewayProtectedCacheProxy<>(this, new CacheOperationContext(), false); + } + + /** {@inheritDoc} */ + @Override public CacheMetrics metrics() { + return ctx.cache().clusterMetrics(); + } + + /** {@inheritDoc} */ + @Override public CacheMetrics metrics(ClusterGroup grp) { + return ctx.cache().clusterMetrics(grp); + } + + /** {@inheritDoc} */ + @Override public CacheMetrics localMetrics() { + return ctx.cache().localMetrics(); + } + + /** {@inheritDoc} */ + @Override public CacheMetricsMXBean mxBean() { + return ctx.cache().clusterMxBean(); + } + + /** {@inheritDoc} */ + @Override public CacheMetricsMXBean localMxBean() { + return ctx.cache().localMxBean(); + } + + /** {@inheritDoc} */ + @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { + CacheConfiguration cfg = ctx.config(); + + if (!clazz.isAssignableFrom(cfg.getClass())) + throw new IllegalArgumentException(); + + return clazz.cast(cfg); + } + + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withSkipStore() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public <K1, V1> IgniteCache<K1, V1> withKeepBinary() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withNoRetries() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withPartitionRecover() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + try { + if (isAsync()) { + if (ctx.cache().isLocal()) + setFuture(ctx.cache().localLoadCacheAsync(p, args)); + else + setFuture(ctx.cache().globalLoadCacheAsync(p, args)); + } + else { + if (ctx.cache().isLocal()) + ctx.cache().localLoadCache(p, args); + else + ctx.cache().globalLoadCache(p, args); + } + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, + @Nullable Object... args) throws CacheException { + try { + if (ctx.cache().isLocal()) + return (IgniteFuture<Void>)createFuture(ctx.cache().localLoadCacheAsync(p, args)); + else + return (IgniteFuture<Void>)createFuture(ctx.cache().globalLoadCacheAsync(p, args)); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + try { + if (isAsync()) + setFuture(delegate.localLoadCacheAsync(p, args)); + else + delegate.localLoadCache(p, args); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, + @Nullable Object... args) throws CacheException { + return (IgniteFuture<Void>)createFuture(delegate.localLoadCacheAsync(p, args)); + } + + /** {@inheritDoc} */ + @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { + try { + if (isAsync()) { + setFuture(delegate.getAndPutIfAbsentAsync(key, val)); + + return null; + } + else + return delegate.getAndPutIfAbsent(key, val); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException { + return createFuture(delegate.getAndPutIfAbsentAsync(key, val)); + } + + /** {@inheritDoc} */ + @Override public Lock lock(K key) throws CacheException { + return lockAll(Collections.singleton(key)); + } + + /** {@inheritDoc} */ + @Override public Lock lockAll(final Collection<? extends K> keys) { + return new CacheLockImpl<>(ctx.gate(), delegate, new CacheOperationContext(), keys); + } + + /** {@inheritDoc} */ + @Override public boolean isLocalLocked(K key, boolean byCurrThread) { + return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key); + } + + /** + * @param scanQry ScanQry. + * @param transformer Transformer + * @param grp Optional cluster group. + * @return Cursor. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private <T, R> QueryCursor<R> query( + final ScanQuery scanQry, + @Nullable final IgniteClosure<T, R> transformer, + @Nullable ClusterGroup grp) + throws IgniteCheckedException { + + final CacheQuery<R> qry; + + CacheOperationContext opCtxCall = ctx.operationContextPerCall(); + + boolean isKeepBinary = opCtxCall != null && opCtxCall.isKeepBinary(); + + IgniteBiPredicate<K, V> p = scanQry.getFilter(); + + qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary); + + if (scanQry.getPageSize() > 0) + qry.pageSize(scanQry.getPageSize()); + + if (grp != null) + qry.projection(grp); + + final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN, + ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() { + @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException { + final GridCloseableIterator iter0 = qry.executeScanQuery(); + + final boolean needToConvert = transformer == null; + + return new GridCloseableIteratorAdapter<R>() { + @Override protected R onNext() throws IgniteCheckedException { + Object next = iter0.nextX(); + + if (needToConvert) { + Map.Entry<K, V> entry = (Map.Entry<K, V>)next; + + return (R)new CacheEntryImpl<>(entry.getKey(), entry.getValue()); + } + + return (R)next; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return iter0.hasNextX(); + } + + @Override protected void onClose() throws IgniteCheckedException { + iter0.close(); + } + }; + } + }, true); + + return new QueryCursorImpl<>(iter); + } + + /** + * @param filter Filter. + * @param grp Optional cluster group. + * @return Cursor. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp) + throws IgniteCheckedException { + final CacheQuery qry; + + CacheOperationContext opCtxCall = ctx.operationContextPerCall(); + + boolean isKeepBinary = opCtxCall != null && opCtxCall.isKeepBinary(); + + final CacheQueryFuture fut; + + if (filter instanceof TextQuery) { + TextQuery p = (TextQuery)filter; + + qry = ctx.queries().createFullTextQuery(p.getType(), p.getText(), isKeepBinary); + + if (grp != null) + qry.projection(grp); + + fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, p.getText(), ctx, + new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { + @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() { + return qry.execute(); + } + }, false); + } + else if (filter instanceof SpiQuery) { + qry = ctx.queries().createSpiQuery(isKeepBinary); + + if (grp != null) + qry.projection(grp); + + fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, filter.getClass().getSimpleName(), + ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { + @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() { + return qry.execute(((SpiQuery)filter).getArgs()); + } + }, false); + } + else { + if (filter instanceof SqlFieldsQuery) + throw new CacheException("Use methods 'queryFields' and 'localQueryFields' for " + + SqlFieldsQuery.class.getSimpleName() + "."); + + throw new CacheException("Unsupported query type: " + filter); + } + + return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K, V>>() { + /** */ + private Cache.Entry<K, V> cur; + + @Override protected Entry<K, V> onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); + + Cache.Entry<K, V> e = cur; + + cur = null; + + return e; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (cur != null) + return true; + + Object next = fut.next(); + + // Workaround a bug: if IndexingSpi is configured future represents Iterator<Cache.Entry> + // instead of Iterator<Map.Entry> due to IndexingSpi interface. + if (next == null) + return false; + + if (next instanceof Cache.Entry) + cur = (Cache.Entry)next; + else { + Map.Entry e = (Map.Entry)next; + + cur = new CacheEntryImpl(e.getKey(), e.getValue()); + } + + return true; + } + + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + } + }); + } + + /** + * @param loc Enforce local. + * @return Local node cluster group. + */ + private ClusterGroup projection(boolean loc) { + if (loc || ctx.isLocal() || ctx.isReplicatedAffinityNode()) + return ctx.kernalContext().grid().cluster().forLocal(); + + if (ctx.isReplicated()) + return ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom(); + + return null; + } + + /** + * Executes continuous query. + * + * @param qry Query. + * @param loc Local flag. + * @param keepBinary Keep binary flag. + * @return Initial iteration cursor. + */ + @SuppressWarnings("unchecked") + private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc, boolean keepBinary) { + if (qry.getInitialQuery() instanceof ContinuousQuery) + throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + + "continuous query. Use SCAN or SQL query for initial iteration."); + + if (qry.getLocalListener() == null) + throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + + if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null) + throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory."); + + try { + final UUID routineId = ctx.continuousQueries().executeQuery( + qry.getLocalListener(), + qry.getRemoteFilter(), + qry.getRemoteFilterFactory(), + qry.getPageSize(), + qry.getTimeInterval(), + qry.isAutoUnsubscribe(), + loc, + keepBinary, + qry.isIncludeExpired()); + + final QueryCursor<Cache.Entry<K, V>> cur = + qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null; + + return new QueryCursor<Cache.Entry<K, V>>() { + @Override public Iterator<Cache.Entry<K, V>> iterator() { + return cur != null ? cur.iterator() : new GridEmptyIterator<Cache.Entry<K, V>>(); + } + + @Override public List<Cache.Entry<K, V>> getAll() { + return cur != null ? cur.getAll() : Collections.<Cache.Entry<K, V>>emptyList(); + } + + @Override public void close() { + if (cur != null) + cur.close(); + + try { + ctx.kernalContext().continuous().stopRoutine(routineId).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + }; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) { + return (FieldsQueryCursor<List<?>>)query((Query)qry); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <R> QueryCursor<R> query(Query<R> qry) { + A.notNull(qry, "qry"); + try { + ctx.checkSecurity(SecurityPermission.CACHE_READ); + + validate(qry); + + convertToBinary(qry); + + CacheOperationContext opCtxCall = ctx.operationContextPerCall(); + + boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary(); + + if (qry instanceof ContinuousQuery) + return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal(), keepBinary); + + if (qry instanceof SqlQuery) + return (QueryCursor<R>)ctx.kernalContext().query().querySql(ctx, (SqlQuery)qry, keepBinary); + + if (qry instanceof SqlFieldsQuery) + return (FieldsQueryCursor<R>)ctx.kernalContext().query().querySqlFields(ctx, (SqlFieldsQuery)qry, + keepBinary); + + if (qry instanceof ScanQuery) + return query((ScanQuery)qry, null, projection(qry.isLocal())); + + return (QueryCursor<R>)query(qry, projection(qry.isLocal())); + } + catch (Exception e) { + if (e instanceof CacheException) + throw (CacheException)e; + + throw new CacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) { + A.notNull(qry, "qry"); + A.notNull(transformer, "transformer"); + + if (!(qry instanceof ScanQuery)) + throw new UnsupportedOperationException("Transformers are supported only for SCAN queries."); + + try { + ctx.checkSecurity(SecurityPermission.CACHE_READ); + + validate(qry); + + return query((ScanQuery<K, V>)qry, transformer, projection(qry.isLocal())); + } + catch (Exception e) { + if (e instanceof CacheException) + throw (CacheException)e; + + throw new CacheException(e); + } + } + + /** + * Convert query arguments to BinaryObjects if binary marshaller used. + * + * @param qry Query. + */ + private void convertToBinary(final Query qry) { + if (ctx.binaryMarshaller()) { + if (qry instanceof SqlQuery) { + final SqlQuery sqlQry = (SqlQuery) qry; + + convertToBinary(sqlQry.getArgs()); + } + else if (qry instanceof SpiQuery) { + final SpiQuery spiQry = (SpiQuery) qry; + + convertToBinary(spiQry.getArgs()); + } + else if (qry instanceof SqlFieldsQuery) { + final SqlFieldsQuery fieldsQry = (SqlFieldsQuery) qry; + + convertToBinary(fieldsQry.getArgs()); + } + } + } + + /** + * Converts query arguments to BinaryObjects if binary marshaller used. + * + * @param args Arguments. + */ + private void convertToBinary(final Object[] args) { + if (args == null) + return; + + for (int i = 0; i < args.length; i++) + args[i] = ctx.cacheObjects().binary().toBinary(args[i]); + } + + /** + * Checks query. + * + * @param qry Query + * @throws CacheException If query indexing disabled for sql query. + */ + private void validate(Query qry) { + if (!QueryUtils.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) && + !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery)) + throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() + + ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable."); + + if (!ctx.kernalContext().query().moduleEnabled() && + (qry instanceof SqlQuery || qry instanceof SqlFieldsQuery || qry instanceof TextQuery)) + throw new CacheException("Failed to execute query. Add module 'ignite-indexing' to the classpath " + + "of all Ignite nodes."); + } + + /** {@inheritDoc} */ + @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { + try { + return delegate.localEntries(peekModes); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public QueryMetrics queryMetrics() { + return delegate.context().queries().metrics(); + } + + /** {@inheritDoc} */ + @Override public void resetQueryMetrics() { + delegate.context().queries().resetMetrics(); + } + + /** {@inheritDoc} */ + @Override public Collection<? extends QueryDetailMetrics> queryDetailMetrics() { + return delegate.context().queries().detailMetrics(); + } + + /** {@inheritDoc} */ + @Override public void resetQueryDetailMetrics() { + delegate.context().queries().resetDetailMetrics(); + } + + /** {@inheritDoc} */ + @Override public void localEvict(Collection<? extends K> keys) { + delegate.evictAll(keys); + } + + /** {@inheritDoc} */ + @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { + try { + return delegate.localPeek(key, peekModes, null); + } + catch (IgniteException | IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public int size(CachePeekMode... peekModes) throws CacheException { + try { + if (isAsync()) { + setFuture(delegate.sizeAsync(peekModes)); + + return 0; + } + else + return delegate.size(peekModes); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException { + return createFuture(delegate.sizeAsync(peekModes)); + } + + /** {@inheritDoc} */ + @Override public long sizeLong(CachePeekMode... peekModes) throws CacheException { + try { + if (isAsync()) { + setFuture(delegate.sizeLongAsync(peekModes)); + + return 0; + } + else + return delegate.sizeLong(peekModes); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException { + return createFuture(delegate.sizeLongAsync(peekModes)); + } + + /** {@inheritDoc} */ + @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException { + try { + if (isAsync()) { + setFuture(delegate.sizeLongAsync(part, peekModes)); + + return 0; + } + else + return delegate.sizeLong(part, peekModes); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Long> sizeLongAsync(int part, CachePeekMode... peekModes) throws CacheException { + return createFuture(delegate.sizeLongAsync(part, peekModes)); + } + + /** {@inheritDoc} */ + @Override public int localSize(CachePeekMode... peekModes) { + try { + return delegate.localSize(peekModes); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public long localSizeLong(CachePeekMode... peekModes) { + try { + return delegate.localSizeLong(peekModes); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public long localSizeLong(int part, CachePeekMode... peekModes) { + try { + return delegate.localSizeLong(part, peekModes); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public V get(K key) { + try { + if (isAsync()) { + setFuture(delegate.getAsync(key)); + + return null; + } + else + return delegate.get(key); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAsync(K key) { + return createFuture(delegate.getAsync(key)); + } + + /** {@inheritDoc} */ + @Override public CacheEntry<K, V> getEntry(K key) { + try { + if (isAsync()) { + setFuture(delegate.getEntryAsync(key)); + + return null; + } + else + return delegate.getEntry(key); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) { + return createFuture(delegate.getEntryAsync(key)); + } + + /** {@inheritDoc} */ + @Override public Map<K, V> getAll(Set<? extends K> keys) { + try { + if (isAsync()) { + setFuture(delegate.getAllAsync(keys)); + + return null; + } + else + return delegate.getAll(keys); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) { + return createFuture(delegate.getAllAsync(keys)); + } + + /** {@inheritDoc} */ + @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) { + try { + if (isAsync()) { + setFuture(delegate.getEntriesAsync(keys)); + + return null; + } + else + return delegate.getEntries(keys); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) { + return createFuture(delegate.getEntriesAsync(keys)); + } + + /** {@inheritDoc} */ + @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { + try { + if (isAsync()) { + setFuture(delegate.getAllOutTxAsync(keys)); + + return null; + } + else + return delegate.getAllOutTx(keys); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { + return createFuture(delegate.getAllOutTxAsync(keys)); + } + + /** + * @param keys Keys. + * @return Values map. + */ + public Map<K, V> getAll(Collection<? extends K> keys) { + try { + if (isAsync()) { + setFuture(delegate.getAllAsync(keys)); + + return null; + } + else + return delegate.getAll(keys); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(K key) { + if (isAsync()) { + setFuture(delegate.containsKeyAsync(key)); + + return false; + } + else + return delegate.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeyAsync(K key) { + return createFuture(delegate.containsKeyAsync(key)); + } + + /** {@inheritDoc} */ + @Override public boolean containsKeys(Set<? extends K> keys) { + if (isAsync()) { + setFuture(delegate.containsKeysAsync(keys)); + + return false; + } + else + return delegate.containsKeys(keys); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) { + return createFuture(delegate.containsKeysAsync(keys)); + } + + /** {@inheritDoc} */ + @Override public void loadAll( + Set<? extends K> keys, + boolean replaceExisting, + @Nullable final CompletionListener completionLsnr + ) { + IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting); + + if (completionLsnr != null) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + + completionLsnr.onCompletion(); + } + catch (IgniteCheckedException e) { + completionLsnr.onException(cacheException(e)); + } + } + }); + } + } + + /** {@inheritDoc} */ + @Override public void put(K key, V val) { + try { + if (isAsync()) + setFuture(putAsync0(key, val)); + else + delegate.put(key, val); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> putAsync(K key, V val) { + return createFuture(putAsync0(key, val)); + } + + /** + * Put async internal operation implementation. + * + * @param key Key. + * @param val Value. + * @return Internal future. + */ + private IgniteInternalFuture<Void> putAsync0(K key, V val) { + IgniteInternalFuture<Boolean> fut = delegate.putAsync(key, val); + + return fut.chain(new CX1<IgniteInternalFuture<Boolean>, Void>() { + @Override public Void applyx(IgniteInternalFuture<Boolean> fut1) throws IgniteCheckedException { + try { + fut1.get(); + } + catch (RuntimeException e) { + throw new GridClosureException(e); + } + + return null; + } + }); + } + + /** {@inheritDoc} */ + @Override public V getAndPut(K key, V val) { + try { + if (isAsync()) { + setFuture(delegate.getAndPutAsync(key, val)); + + return null; + } + else + return delegate.getAndPut(key, val); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndPutAsync(K key, V val) { + return createFuture(delegate.getAndPutAsync(key, val)); + } + + /** {@inheritDoc} */ + @Override public void putAll(Map<? extends K, ? extends V> map) { + try { + if (isAsync()) + setFuture(delegate.putAllAsync(map)); + else + delegate.putAll(map); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) { + return (IgniteFuture<Void>)createFuture(delegate.putAllAsync(map)); + } + + /** {@inheritDoc} */ + @Override public boolean putIfAbsent(K key, V val) { + try { + if (isAsync()) { + setFuture(delegate.putIfAbsentAsync(key, val)); + + return false; + } + else + return delegate.putIfAbsent(key, val); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) { + return createFuture(delegate.putIfAbsentAsync(key, val)); + } + + /** {@inheritDoc} */ + @Override public boolean remove(K key) { + try { + if (isAsync()) { + setFuture(delegate.removeAsync(key)); + + return false; + } + else + return delegate.remove(key); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> removeAsync(K key) { + return createFuture(delegate.removeAsync(key)); + } + + /** {@inheritDoc} */ + @Override public boolean remove(K key, V oldVal) { + try { + if (isAsync()) { + setFuture(delegate.removeAsync(key, oldVal)); + + return false; + } + else + return delegate.remove(key, oldVal); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) { + return createFuture(delegate.removeAsync(key, oldVal)); + } + + /** {@inheritDoc} */ + @Override public V getAndRemove(K key) { + try { + if (isAsync()) { + setFuture(delegate.getAndRemoveAsync(key)); + + return null; + } + else + return delegate.getAndRemove(key); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndRemoveAsync(K key) { + return createFuture(delegate.getAndRemoveAsync(key)); + } + + /** {@inheritDoc} */ + @Override public boolean replace(K key, V oldVal, V newVal) { + try { + if (isAsync()) { + setFuture(delegate.replaceAsync(key, oldVal, newVal)); + + return false; + } + else + return delegate.replace(key, oldVal, newVal); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { + return createFuture(delegate.replaceAsync(key, oldVal, newVal)); + } + + /** {@inheritDoc} */ + @Override public boolean replace(K key, V val) { + try { + if (isAsync()) { + setFuture(delegate.replaceAsync(key, val)); + + return false; + } + else + return delegate.replace(key, val); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Boolean> replaceAsync(K key, V val) { + return createFuture(delegate.replaceAsync(key, val)); + } + + /** {@inheritDoc} */ + @Override public V getAndReplace(K key, V val) { + try { + if (isAsync()) { + setFuture(delegate.getAndReplaceAsync(key, val)); + + return null; + } + else + return delegate.getAndReplace(key, val); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) { + return createFuture(delegate.getAndReplaceAsync(key, val)); + } + + /** {@inheritDoc} */ + @Override public void removeAll(Set<? extends K> keys) { + try { + if (isAsync()) + setFuture(delegate.removeAllAsync(keys)); + else + delegate.removeAll(keys); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) { + return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync(keys)); + } + + /** {@inheritDoc} */ + @Override public void removeAll() { + try { + if (isAsync()) + setFuture(delegate.removeAllAsync()); + else + delegate.removeAll(); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> removeAllAsync() { + return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync()); + } + + /** {@inheritDoc} */ + @Override public void clear(K key) { + try { + if (isAsync()) + setFuture(delegate.clearAsync(key)); + else + delegate.clear(key); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAsync(K key) { + return (IgniteFuture<Void>)createFuture(delegate.clearAsync(key)); + } + + /** {@inheritDoc} */ + @Override public void clearAll(Set<? extends K> keys) { + try { + if (isAsync()) + setFuture(delegate.clearAllAsync(keys)); + else + delegate.clearAll(keys); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) { + return (IgniteFuture<Void>)createFuture(delegate.clearAllAsync(keys)); + } + + /** {@inheritDoc} */ + @Override public void clear() { + try { + if (isAsync()) + setFuture(delegate.clearAsync()); + else + delegate.clear(); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> clearAsync() { + return (IgniteFuture<Void>)createFuture(delegate.clearAsync()); + } + + /** {@inheritDoc} */ + @Override public void localClear(K key) { + delegate.clearLocally(key); + } + + /** {@inheritDoc} */ + @Override public void localClearAll(Set<? extends K> keys) { + for (K key : keys) + delegate.clearLocally(key); + } + + /** {@inheritDoc} */ + @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) + throws EntryProcessorException { + try { + if (isAsync()) { + setFuture(invokeAsync0(key, entryProcessor, args)); + + return null; + } + else { + EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor, + Object... args) { + return createFuture(invokeAsync0(key, entryProcessor, args)); + } + + /** + * Invoke async operation internal implementation. + * + * @param key Key. + * @param entryProcessor Processor. + * @param args Arguments. + * @return Internal future. + */ + private <T> IgniteInternalFuture<T> invokeAsync0(K key, EntryProcessor<K, V, T> entryProcessor, Object[] args) { + IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args); + + return fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() { + @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut1) + throws IgniteCheckedException { + try { + EntryProcessorResult<T> res = fut1.get(); + + return res != null ? res.get() : null; + } + catch (RuntimeException e) { + throw new GridClosureException(e); + } + } + }); + } + + + /** {@inheritDoc} */ + @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args) + throws EntryProcessorException { + return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<T> invokeAsync(K key, CacheEntryProcessor<K, V, T> entryProcessor, + Object... args) { + return invokeAsync(key, (EntryProcessor<K, V, T>)entryProcessor, args); + } + + /** + * @param topVer Locked topology version. + * @param key Key. + * @param entryProcessor Entry processor. + * @param args Arguments. + * @return Invoke result. + */ + public <T> T invoke(@Nullable AffinityTopologyVersion topVer, + K key, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + try { + if (isAsync()) + throw new UnsupportedOperationException(); + else { + EntryProcessorResult<T> res = delegate.invoke(topVer, key, entryProcessor, args); + + return res != null ? res.get() : null; + } + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, + Object... args) { + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + + return null; + } + else + return delegate.invokeAll(keys, entryProcessor, args); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + EntryProcessor<K, V, T> entryProcessor, Object... args) { + return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + CacheEntryProcessor<K, V, T> entryProcessor, + Object... args) { + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + + return null; + } + else + return delegate.invokeAll(keys, entryProcessor, args); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, + CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { + return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args)); + } + + /** {@inheritDoc} */ + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) { + try { + if (isAsync()) { + setFuture(delegate.invokeAllAsync(map, args)); + + return null; + } + else + return delegate.invokeAll(map, args); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { + return createFuture(delegate.invokeAllAsync(map, args)); + } + + /** {@inheritDoc} */ + @Override public String getName() { + return delegate.name(); + } + + /** {@inheritDoc} */ + @Override public CacheManager getCacheManager() { + return cacheMgr; + } + + /** + * @param cacheMgr Cache manager. + */ + public void setCacheManager(CacheManager cacheMgr) { + this.cacheMgr = cacheMgr; + } + + /** {@inheritDoc} */ + @Override public void destroy() { + destroyAsync().get(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> destroyAsync() { + return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), false, true, false)); + } + + /** {@inheritDoc} */ + @Override public void close() { + closeAsync().get(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> closeAsync() { + return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(ctx.name())); + } + + /** {@inheritDoc} */ + @Override public boolean isClosed() { + return ctx.kernalContext().cache().context().closed(ctx); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> clazz) { + if (clazz.isAssignableFrom(getClass())) + return (T)this; + else if (clazz.isAssignableFrom(IgniteEx.class)) + return (T)ctx.grid(); + + throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz); + } + + /** {@inheritDoc} */ + @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { + try { + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false, opCtx != null && opCtx.isKeepBinary()); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { + try { + ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override public Iterator<Cache.Entry<K, V>> iterator() { + try { + return ctx.cache().igniteIterator(); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteCache<K, V> createAsyncInstance() { + return new IgniteCacheProxyImpl<K, V>( + ctx, + delegate, + true + ); + } + + /** + * Creates projection that will operate with binary objects. <p> Projection returned by this method will force + * cache not to deserialize binary objects, so keys and values will be returned from cache API methods without + * changes. Therefore, signature of the projection can contain only following types: <ul> <li>{@code BinaryObject} + * for binary classes</li> <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li> + * <li>Arrays of primitives (byte[], int[], ...)</li> <li>{@link String} and array of {@link String}s</li> + * <li>{@link UUID} and array of {@link UUID}s</li> <li>{@link Date} and array of {@link Date}s</li> <li>{@link + * java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> <li>Enums and array of enums</li> <li> Maps, + * collections and array of objects (but objects inside them will still be converted if they are binary) </li> + * </ul> <p> For example, if you use {@link Integer} as a key and {@code Value} class as a value (which will be + * stored in binary format), you should acquire following projection to avoid deserialization: + * <pre> + * IgniteInternalCache<Integer, GridBinaryObject> prj = cache.keepBinary(); + * + * // Value is not deserialized and returned in binary format. + * GridBinaryObject po = prj.get(1); + * </pre> + * <p> Note that this method makes sense only if cache is working in binary mode ({@code + * CacheConfiguration#isBinaryEnabled()} returns {@code true}. If not, this method is no-op and will return + * current projection. + * + * @return Projection for binary objects. + */ + @Override + @SuppressWarnings("unchecked") + public <K1, V1> IgniteCache<K1, V1> keepBinary() { + throw new UnsupportedOperationException(); + } + + /** + * @param dataCenterId Data center ID. + * @return Projection for data center id. + */ + @Override + @SuppressWarnings("unchecked") + public IgniteCache<K, V> withDataCenterId(byte dataCenterId) { + throw new UnsupportedOperationException(); + } + + /** + * @return Cache with skip store enabled. + */ + @Override + public IgniteCache<K, V> skipStore() { + throw new UnsupportedOperationException(); + } + + /** + * Method converts exception to IgniteCacheRestartingException in case of cache restarting + * or to CacheException in other cases. + * + * @param e {@code IgniteCheckedException} or {@code IgniteException}. + * @return Cache exception. + */ + private RuntimeException cacheException(Exception e) { + GridFutureAdapter<Void> restartFut = this.restartFut.get(); + + if (restartFut != null && !restartFut.isDone()) { + if (X.hasCause(e, CacheStoppedException.class) || X.hasSuppressed(e, CacheStoppedException.class)) + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), "Cache is restarting: " + + ctx.name()); + } + + if (e instanceof IgniteCheckedException) + return CU.convertToCacheException((IgniteCheckedException) e); + + if (e instanceof RuntimeException) + return (RuntimeException) e; + + throw new IllegalStateException("Unknown exception", e); + } + + /** + * @param fut Future for async operation. + */ + private <R> void setFuture(IgniteInternalFuture<R> fut) { + curFut.set(createFuture(fut)); + } + + /** {@inheritDoc} */ + @Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) { + return new IgniteCacheFutureImpl<>(fut); + } + + /** + * @return Internal proxy. + */ + @Override + public GridCacheProxyImpl<K, V> internalProxy() { + return new GridCacheProxyImpl<>(ctx, delegate, ctx.operationContextPerCall()); + } + + /** + * @return {@code True} if proxy was closed. + */ + @Override public boolean isProxyClosed() { + return closed; + } + + /** + * Closes this proxy instance. + */ + @Override public void closeProxy() { + closed = true; + } + + /** {@inheritDoc} */ + @Override public Collection<Integer> lostPartitions() { + return delegate.lostPartitions(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx); + + out.writeObject(delegate); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ctx = (GridCacheContext<K, V>)in.readObject(); + + delegate = (IgniteInternalCache<K, V>)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> rebalance() { + return new IgniteFutureImpl<>(ctx.preloader().forceRebalance()); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> indexReadyFuture() { + IgniteInternalFuture fut = ctx.shared().database().indexRebuildFuture(ctx.cacheId()); + + if (fut == null) + return new IgniteFinishedFutureImpl<>(); + + return new IgniteFutureImpl<>(fut); + } + + /** + * Gets value without waiting for toplogy changes. + * + * @param key Key. + * @return Value. + */ + @Override + public V getTopologySafe(K key) { + try { + return delegate.getTopologySafe(key); + } + catch (IgniteCheckedException | IgniteException e) { + throw cacheException(e); + } + } + + /** + * Throws {@code IgniteCacheRestartingException} if proxy is restarting. + */ + public void checkRestart() { + if (isRestarting()) + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut.get()), "Cache is restarting: " + + context().name()); + } + + /** + * @return True if proxy is restarting, false in other case. + */ + public boolean isRestarting() { + return restartFut != null && restartFut.get() != null; + } + + /** + * Restarts this cache proxy. + */ + public void restart() { + GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>(); + + final GridFutureAdapter<Void> currentFut = this.restartFut.get(); + + boolean changed = this.restartFut.compareAndSet(currentFut, restartFut); + + if (changed && currentFut != null) + restartFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { + @Override public void apply(IgniteInternalFuture<Void> future) { + if (future.error() != null) + currentFut.onDone(future.error()); + else + currentFut.onDone(); + } + }); + } + + /** + * Mark this proxy as restarted. + * + * @param ctx New cache context. + * @param delegate New delegate. + */ + public void onRestarted(GridCacheContext ctx, IgniteInternalCache delegate) { + GridFutureAdapter<Void> restartFut = this.restartFut.get(); + + assert restartFut != null; + + this.ctx = ctx; + this.delegate = delegate; + + restartFut.onDone(); + + this.restartFut.compareAndSet(restartFut, null); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteCacheProxyImpl.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java index 0cec1fe..61ab122 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java @@ -62,7 +62,7 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache IgniteLogger log = ctx.log(IgniteDrDataStreamerCacheUpdater.class); GridCacheAdapter internalCache = ctx.cache().internalCache(cacheName); - CacheOperationContext opCtx = ((IgniteCacheProxy)cache0).operationContext(); + CacheOperationContext opCtx = ((IgniteCacheProxy)cache0).context().operationContextPerCall(); IgniteInternalCache cache = opCtx != null ? new GridCacheProxyImpl(internalCache.context(), internalCache, opCtx) : internalCache; http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index 6207995..ef914a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -17,6 +17,18 @@ package org.apache.ignite.internal.processors.platform.cache; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import javax.cache.Cache; +import javax.cache.integration.CompletionListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -41,8 +53,8 @@ import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformNativeException; -import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy; import org.apache.ignite.internal.processors.platform.PlatformTarget; +import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryProxy; import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor; @@ -63,19 +75,6 @@ import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; -import javax.cache.Cache; -import javax.cache.integration.CompletionListener; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.EntryProcessorResult; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; - /** * Native cache wrapper implementation. */ @@ -1009,7 +1008,7 @@ public class PlatformCache extends PlatformAbstractTarget { } case OP_WITH_NO_RETRIES: { - CacheOperationContext opCtx = cache.operationContext(); + CacheOperationContext opCtx = cache.context().operationContextPerCall(); if (opCtx != null && opCtx.noRetries()) return this; @@ -1018,7 +1017,9 @@ public class PlatformCache extends PlatformAbstractTarget { } case OP_WITH_SKIP_STORE: { - if (cache.delegate().skipStore()) + CacheOperationContext opCtx = cache.context().operationContextPerCall(); + + if (opCtx != null && opCtx.skipStore()) return this; return copy(rawCache.withSkipStore(), keepBinary); http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java index 7affa8c..7005e14 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -149,7 +150,15 @@ public class CacheEntryProcessorCopySelfTest extends GridCommonAbstractTest { } }); - CacheObject obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(0).peekVisibleValue(); + GridCacheAdapter ca = (GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate(); + + GridCacheEntryEx entry = ca.entryEx(0); + + entry.unswap(); + + CacheObject obj = entry.peekVisibleValue(); + + ca.context().evicts().touch(entry, AffinityTopologyVersion.NONE); int actCnt = cnt.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java index f67e247..c53bc4b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java @@ -768,6 +768,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { cache.close(); + // Check second close succeeds without exception. cache.close(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index 822537c..9376971 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -34,6 +34,7 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -246,6 +247,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { } } + cfg.setAffinity(new RendezvousAffinityFunction(false, 4096)); cfg.setCacheMode(cacheMode()); cfg.setAtomicityMode(atomicityMode()); cfg.setWriteSynchronizationMode(writeSynchronization()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java index 80404ce..2a90bf6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java @@ -178,7 +178,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst cache.put(key, val); CacheObject obj = - ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue(); + ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue(); // Check thar internal entry wasn't changed. assertEquals(i, getValue(obj, cache)); @@ -211,7 +211,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst cache.put(key, newTestVal); - obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue(); + obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue(); // Check thar internal entry wasn't changed. assertEquals(-i, getValue(obj, cache)); @@ -290,7 +290,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst }); CacheObject obj = - ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue(); + ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue(); assertNotEquals(WRONG_VALUE, getValue(obj, cache)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java index 3c5fe0e..e068252 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java @@ -124,7 +124,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach info("Node is reported as NOT affinity node for key [key=" + key + ", nodeId=" + locNode.id() + ']'); - if (nearEnabled() && cache == cache0) + if (nearEnabled() && cache.equals(cache0)) assertEquals((Integer)i, cache0.localPeek(key)); else assertNull(cache0.localPeek(key)); @@ -184,7 +184,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach info("Node is reported as NOT affinity node for key [key=" + key + ", nodeId=" + locNode.id() + ']'); - if (nearEnabled() && cache == cache0) + if (nearEnabled() && cache.equals(cache0)) assertEquals((Integer)i, cache0.localPeek(key)); else assertNull(cache0.localPeek(key)); http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java index dcba92f..8d5462d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -49,12 +47,13 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import junit.framework.AssertionFailedError; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; @@ -83,7 +82,6 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest; import org.apache.ignite.transactions.Transaction; http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index bcf46fd..e473d52 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -65,7 +65,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index d3269c3..b55e3d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -50,6 +49,7 @@ import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; +import com.google.common.collect.Sets; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -78,8 +78,8 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridIterator; @@ -87,6 +87,7 @@ import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; @@ -2892,10 +2893,10 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { IgniteInternalFuture cacheFut = GridTestUtils.runAsync(new Runnable() { @Override public void run() { - try { - int cntr = 0; + int cntr = 0; - while (!stop.get()) { + while (!stop.get()) { + try { ThreadLocalRandom rnd = ThreadLocalRandom.current(); String grp; @@ -2927,13 +2928,20 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { node.destroyCache(cache.getName()); } - } - catch (Exception e) { - err.set(true); + catch (Exception e) { + if (X.hasCause(e, CacheStoppedException.class)) { + // Cache operation can be blocked on + // awaiting new topology version and cancelled with CacheStoppedException cause. - log.error("Unexpected error(2): " + e, e); + continue; + } - stop.set(true); + err.set(true); + + log.error("Unexpected error(2): " + e, e); + + stop.set(true); + } } } }, "cache-destroy-thread"); @@ -3706,7 +3714,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { final AtomicReferenceArray<IgniteCache> caches = new AtomicReferenceArray<>(CACHES); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < CACHES; i++) { CacheAtomicityMode atomicityMode = i % 2 == 0 ? ATOMIC : TRANSACTIONAL; caches.set(i, @@ -3799,28 +3807,41 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() { @Override public void run() { - try { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - while (!stop.get()) { + while (!stop.get()) { + try { int idx = rnd.nextInt(CACHES); IgniteCache cache = caches.get(idx); if (cache != null && caches.compareAndSet(idx, cache, null)) { - for (int i = 0; i < 10; i++) - cacheOperation(rnd, cache); - - caches.set(idx, cache); + try { + for (int i = 0; i < 10; i++) + cacheOperation(rnd, cache); + } + catch (Exception e) { + if (X.hasCause(e, CacheStoppedException.class)) { + // Cache operation can be blocked on + // awaiting new topology version and cancelled with CacheStoppedException cause. + + continue; + } + + throw e; + } + finally { + caches.set(idx, cache); + } } } - } - catch (Exception e) { - err.set(e); + catch (Exception e) { + err.set(e); - log.error("Unexpected error: " + e, e); + log.error("Unexpected error: " + e, e); - stop.set(true); + stop.set(true); + } } } }, 8, "op-thread"); http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java index 7cb9861..25b90c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java @@ -113,7 +113,6 @@ public class IgniteCacheStartStopLoadTest extends GridCommonAbstractTest { cache.put(1, obj); - weakMap.put(((IgniteCacheProxy)cache).delegate(), Boolean.TRUE); weakMap.put(obj, Boolean.TRUE); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java index 9a49b6c..6493f88 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.MemoryPolicyConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -217,7 +218,7 @@ public class MemoryPolicyInitializationTest extends GridCommonAbstractTest { * @param plcName Policy name. */ private void verifyCacheMemoryPolicy(IgniteCache cache, String plcName) { - GridCacheContext ctx = U.field(cache, "ctx"); + GridCacheContext ctx = ((IgniteCacheProxy) cache).context(); assertEquals(plcName, ctx.memoryPolicy().config().getName()); }
