IGNITE-7823 Separate cache for non collocated IgniteSet. Signed-off-by: Anton Vinogradov <a...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46d72fcd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46d72fcd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46d72fcd Branch: refs/heads/ignite-8446 Commit: 46d72fcde83abacb55828b001023e31392b28621 Parents: 0ee363f Author: pereslegin-pa <xxt...@gmail.com> Authored: Wed Jul 25 13:55:55 2018 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Wed Jul 25 13:55:55 2018 +0300 ---------------------------------------------------------------------- .../CacheDataStructuresManager.java | 27 ++-- .../datastructures/DataStructuresProcessor.java | 55 ++++++-- .../datastructures/GridCacheSetHeader.java | 31 ++++- .../datastructures/GridCacheSetImpl.java | 128 ++++++++++++------- .../datastructures/GridCacheSetItemKey.java | 5 +- .../datastructures/GridCacheSetProxy.java | 18 +++ .../ignite/internal/util/IgniteUtils.java | 16 +++ .../GridCacheSetAbstractSelfTest.java | 50 ++++++-- .../IgniteCollectionAbstractTest.java | 11 ++ .../IgniteDataStructureWithJobTest.java | 10 +- ...gniteAtomicLongChangingTopologySelfTest.java | 18 +++ .../internal/util/IgniteUtilsSelfTest.java | 29 +++++ 12 files changed, 309 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 4771582..ccfdc15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -72,7 +72,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.concurrent.ConcurrentHashMap; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; /** @@ -237,10 +236,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { { waitInitialization(); - // Non collocated mode enabled only for PARTITIONED cache. - final boolean colloc0 = create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || colloc); - - return queue0(name, cap, colloc0, create); + return queue0(name, cap, colloc, create); } /** @@ -389,32 +385,31 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { * @param name Set name. * @param colloc Collocated flag. * @param create Create flag. + * @param separated Separated cache flag. * @return Set. * @throws IgniteCheckedException If failed. */ @Nullable public <T> IgniteSet<T> set(final String name, boolean colloc, - final boolean create) - throws IgniteCheckedException + boolean create, + boolean separated) throws IgniteCheckedException { - // Non collocated mode enabled only for PARTITIONED cache. - final boolean colloc0 = - create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || colloc); - - return set0(name, colloc0, create); + return set0(name, colloc, create, separated); } /** * @param name Name of set. * @param collocated Collocation flag. * @param create If {@code true} set will be created in case it is not in cache. + * @param separated Separated cache flag. * @return Set. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") @Nullable private <T> IgniteSet<T> set0(String name, boolean collocated, - boolean create) + boolean create, + boolean separated) throws IgniteCheckedException { cctx.gate().enter(); @@ -427,7 +422,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { IgniteInternalCache cache = cctx.cache().withNoRetries(); if (create) { - hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated); + hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated, separated); GridCacheSetHeader old = (GridCacheSetHeader)cache.getAndPutIfAbsent(key, hdr); @@ -612,6 +607,10 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { * @param rmv {@code True} if item was removed. */ private void onSetItemUpdated(SetItemKey key, boolean rmv) { + // Items stored in a separate cache don't have identifier. + if (key.setId() == null) + return; + GridConcurrentHashSet<SetItemKey> set = setDataMap.get(key.setId()); if (set == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 2149ff1..8f6876c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -79,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.GPR; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -115,6 +116,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen /** Atomics system cache name. */ public static final String ATOMICS_CACHE_NAME = "ignite-sys-atomic-cache"; + /** Non collocated IgniteSet will use separate cache if all nodes in cluster is not older then specified version. */ + private static final IgniteProductVersion SEPARATE_CACHE_PER_NON_COLLOCATED_SET_SINCE = + IgniteProductVersion.fromString("2.7.0"); + /** Initial capacity. */ private static final int INITIAL_CAPACITY = 10; @@ -846,9 +851,20 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen return getCollection(new IgniteClosureX<GridCacheContext, IgniteQueue<T>>() { @Override public IgniteQueue<T> applyx(GridCacheContext ctx) throws IgniteCheckedException { - return ctx.dataStructures().queue(name, cap0, create && cfg.isCollocated(), create); + return ctx.dataStructures().queue(name, cap0, isCollocated(cfg), create); } - }, cfg, name, grpName, QUEUE, create); + }, cfg, name, grpName, QUEUE, create, false); + } + + /** + * Non-collocated mode only makes sense for and is only supported for PARTITIONED caches, so + * collocated mode should be enabled for non-partitioned cache by default. + * + * @param cfg Collection configuration. + * @return {@code True} If collocated mode should be enabled. + */ + private boolean isCollocated(CollectionConfiguration cfg) { + return cfg != null && (cfg.isCollocated() || cfg.getCacheMode() != PARTITIONED); } /** @@ -911,19 +927,33 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen } /** + * Get compatible with collection configuration data structure cache. + * * @param cfg Collection configuration. - * @return Cache name. * @param grpName Group name. + * @param dsType Data structure type. + * @param dsName Data structure name. + * @param separated Separated cache flag. + * @return Data structure cache. * @throws IgniteCheckedException If failed. */ - @Nullable private IgniteInternalCache compatibleCache(CollectionConfiguration cfg, String grpName) - throws IgniteCheckedException - { + private IgniteInternalCache compatibleCache(CollectionConfiguration cfg, + String grpName, + DataStructureType dsType, + String dsName, + boolean separated + ) throws IgniteCheckedException { String cacheName = DS_CACHE_NAME_PREFIX + cfg.getAtomicityMode() + "_" + cfg.getCacheMode() + "_" + cfg.getBackups() + "@" + grpName; IgniteInternalCache cache = ctx.cache().cache(cacheName); + if (separated && (cache == null || !cache.containsKey(new GridCacheSetHeaderKey(dsName)))) { + cacheName += "#" + dsType.name() + "_" + dsName; + + cache = ctx.cache().cache(cacheName); + } + if (cache == null) { ctx.cache().dynamicStartCache(cacheConfiguration(cfg, cacheName, grpName), cacheName, @@ -990,6 +1020,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen * @param grpName Cache group name. * @param type Data structure type. * @param create Create flag. + * @param separated Separated cache flag. * @return Collection instance. * @throws IgniteCheckedException If failed. */ @@ -998,7 +1029,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen String name, @Nullable String grpName, final DataStructureType type, - boolean create) + boolean create, + boolean separated) throws IgniteCheckedException { awaitInitialization(); @@ -1051,7 +1083,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen final IgniteInternalCache cache; if (create) { - cache = compatibleCache(cfg, grpName); + cache = compatibleCache(cfg, grpName, type, name, separated); DistributedCollectionMetadata newVal = new DistributedCollectionMetadata(type, cfg, cache.name()); @@ -1521,12 +1553,15 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen A.notNull(name, "name"); final boolean create = cfg != null; + final boolean collocated = isCollocated(cfg); + final boolean separated = !collocated && + U.isOldestNodeVersionAtLeast(SEPARATE_CACHE_PER_NON_COLLOCATED_SET_SINCE, ctx.grid().cluster().nodes()); return getCollection(new CX1<GridCacheContext, IgniteSet<T>>() { @Override public IgniteSet<T> applyx(GridCacheContext cctx) throws IgniteCheckedException { - return cctx.dataStructures().set(name, create && cfg.isCollocated(), create); + return cctx.dataStructures().set(name, collocated, create, separated); } - }, cfg, name, grpName, SET, create); + }, cfg, name, grpName, SET, create, separated); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java index c650b21..5693bda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.datastructures; +import java.io.EOFException; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -33,12 +34,15 @@ public class GridCacheSetHeader implements GridCacheInternal, Externalizable { /** */ private static final long serialVersionUID = 0L; - /** */ + /** Set unique ID. */ private IgniteUuid id; - /** */ + /** Collocation flag. */ private boolean collocated; + /** Separated cache flag. */ + private boolean separated; + /** * Required by {@link Externalizable}. */ @@ -49,10 +53,14 @@ public class GridCacheSetHeader implements GridCacheInternal, Externalizable { /** * @param id Set UUID. * @param collocated Collocation flag. + * @param separated Separated cache flag. */ - public GridCacheSetHeader(IgniteUuid id, boolean collocated) { + public GridCacheSetHeader(IgniteUuid id, boolean collocated, boolean separated) { + assert !(separated && collocated); + this.id = id; this.collocated = collocated; + this.separated = separated; } /** @@ -69,16 +77,31 @@ public class GridCacheSetHeader implements GridCacheInternal, Externalizable { return collocated; } + /** + * @return Separated cache flag. + */ + public boolean separated() { + return separated; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeGridUuid(out, id); out.writeBoolean(collocated); + out.writeBoolean(separated); } /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + @Override public void readExternal(ObjectInput in) throws IOException { id = U.readGridUuid(in); collocated = in.readBoolean(); + + try { + separated = in.readBoolean(); + } + catch (EOFException ignore) { + // Ignore exception for backward compatibility, since header may not contain a "separated" flag. + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 0e3e102..ba65d9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -35,10 +35,11 @@ import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSet; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheIteratorConverter; -import org.apache.ignite.internal.processors.cache.CacheWeakQueryIteratorsHolder; +import org.apache.ignite.internal.processors.cache.CacheWeakQueryIteratorsHolder.WeakReferenceCloseableIterator; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.query.CacheQuery; @@ -50,6 +51,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteRunnable; @@ -83,18 +85,18 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite /** Collocation flag. */ private final boolean collocated; + /** Separated cache flag. */ + private final boolean separated; + /** Set header partition. */ private final int hdrPart; /** Set header key. */ - protected final GridCacheSetHeaderKey setKey; + private final GridCacheSetHeaderKey setKey; /** Removed flag. */ private volatile boolean rmvd; - /** */ - private final boolean binaryMarsh; - /** Access to affinityRun() and affinityCall() functions. */ private final IgniteCompute compute; @@ -107,18 +109,14 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite public GridCacheSetImpl(GridCacheContext ctx, String name, GridCacheSetHeader hdr) { this.ctx = ctx; this.name = name; - id = hdr.id(); - collocated = hdr.collocated(); - binaryMarsh = ctx.binaryMarshaller(); - compute = ctx.kernalContext().grid().compute(); - - cache = ctx.cache(); - - setKey = new GridCacheSetHeaderKey(name); - - log = ctx.logger(GridCacheSetImpl.class); - - hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name)); + this.collocated = hdr.collocated(); + this.id = hdr.id(); + this.compute = ctx.kernalContext().grid().compute(); + this.cache = ctx.cache(); + this.setKey = new GridCacheSetHeaderKey(name); + this.log = ctx.logger(GridCacheSetImpl.class); + this.hdrPart = ctx.affinity().partition(setKey); + this.separated = hdr.separated(); } /** {@inheritDoc} */ @@ -141,7 +139,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public boolean checkHeader() throws IgniteCheckedException { + boolean checkHeader() throws IgniteCheckedException { IgniteInternalCache<GridCacheSetHeaderKey, GridCacheSetHeader> cache0 = ctx.cache(); GridCacheSetHeader hdr = cache0.get(new GridCacheSetHeaderKey(name)); @@ -155,6 +153,11 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite try { onAccess(); + if (separated) { + // Non collocated IgniteSet uses a separate cache which contains additional header element. + return cache.sizeAsync(new CachePeekMode[] {}).get() - 1; + } + if (ctx.isLocal() || ctx.isReplicated()) { GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id); @@ -378,7 +381,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } /** {@inheritDoc} */ - public void affinityRun(IgniteRunnable job) { + @Override public void affinityRun(IgniteRunnable job) { if (!collocated) throw new IgniteException("Failed to execute affinityRun() for non-collocated set: " + name() + ". This operation is supported only for collocated sets."); @@ -387,7 +390,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } /** {@inheritDoc} */ - public <R> R affinityCall(IgniteCallable<R> job) { + @Override public <R> R affinityCall(IgniteCallable<R> job) { if (!collocated) throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() + ". This operation is supported only for collocated sets."); @@ -408,29 +411,12 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } } - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") + /** + * @return Closeable iterator. + */ private GridCloseableIterator<T> iterator0() { try { - CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), null, false, false); - - Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); - - qry.projection(ctx.grid().cluster().forNodes(nodes)); - - CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute(); - - CacheWeakQueryIteratorsHolder.WeakReferenceCloseableIterator it = - ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() { - @Override protected T convert(Map.Entry<T, ?> e) { - return e.getKey(); - } - - @Override protected void remove(T item) { - GridCacheSetImpl.this.remove(item); - } - }); + WeakReferenceCloseableIterator<T> it = separated ? separatedCacheIterator() : sharedCacheIterator(); if (rmvd) { ctx.itHolder().removeIterator(it); @@ -446,6 +432,54 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } /** + * @return Shared cache iterator. + */ + @SuppressWarnings("unchecked") + private WeakReferenceCloseableIterator<T> sharedCacheIterator() throws IgniteCheckedException { + CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, + new GridSetQueryPredicate<>(id, collocated), null, false, false); + + Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); + + qry.projection(ctx.grid().cluster().forNodes(nodes)); + + CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute(); + + return ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() { + @Override protected T convert(Map.Entry<T, ?> e) { + return e.getKey(); + } + + @Override protected void remove(T item) { + GridCacheSetImpl.this.remove(item); + } + }); + } + + /** + * @return Separated cache iterator. + */ + @SuppressWarnings("unchecked") + private WeakReferenceCloseableIterator<T> separatedCacheIterator() throws IgniteCheckedException { + GridCloseableIterator iter = + (GridCloseableIterator)cache.scanIterator(false, new IgniteBiPredicate<Object, Object>() { + @Override public boolean apply(Object k, Object v) { + return k.getClass() == GridCacheSetItemKey.class; + } + }); + + return ctx.itHolder().iterator(iter, new CacheIteratorConverter<T, Map.Entry<T, ?>>() { + @Override protected T convert(Map.Entry<T, ?> e) { + return (T)((SetItemKey)e.getKey()).item(); + } + + @Override protected void remove(T item) { + GridCacheSetImpl.this.remove(item); + } + }); + } + + /** * @param call Callable. * @return Callable result. */ @@ -529,7 +563,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite */ private void checkRemoved() { if (rmvd) - throw new IllegalStateException("Set has been removed from cache: " + this); + throw new IllegalStateException("Set has been removed: " + this); } /** @@ -556,11 +590,19 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } /** + * @return {@code True} If a separated cache is used to store items. + */ + boolean separated() { + return separated; + } + + /** * @param item Set item. * @return Item key. */ private SetItemKey itemKey(Object item) { - return collocated ? new CollocatedSetItemKey(name, id, item) : new GridCacheSetItemKey(id, item); + return collocated ? new CollocatedSetItemKey(name, id, item) : + new GridCacheSetItemKey(separated ? null : id, item); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java index 4280891..95bee8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Objects; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -68,7 +69,7 @@ public class GridCacheSetItemKey implements SetItemKey, Externalizable { /** {@inheritDoc} */ @Override public int hashCode() { - int res = setId.hashCode(); + int res = setId == null ? 0 : setId.hashCode(); res = 31 * res + item.hashCode(); @@ -85,7 +86,7 @@ public class GridCacheSetItemKey implements SetItemKey, Externalizable { GridCacheSetItemKey that = (GridCacheSetItemKey)o; - return setId.equals(that.setId) && item.equals(that.item); + return Objects.equals(this.setId, that.setId) && item.equals(that.item); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index 39d6f18..729f6eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -28,12 +28,16 @@ import java.util.Iterator; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSet; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheGateway; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteRunnable; import org.jetbrains.annotations.NotNull; @@ -352,14 +356,28 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { /** {@inheritDoc} */ @Override public void close() { + IgniteFuture<Boolean> destroyFut = null; + gate.enter(); try { delegate.close(); + + if (delegate.separated()) { + IgniteInternalFuture<Boolean> fut = cctx.kernalContext().cache().dynamicDestroyCache( + cctx.cache().name(), false, true, false); + + ((GridFutureAdapter)fut).ignoreInterrupts(); + + destroyFut = new IgniteFutureImpl<>(fut); + } } finally { gate.leave(); } + + if (destroyFut != null) + destroyFut.get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index b336f91..3dfa8c1 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -10435,6 +10435,22 @@ public abstract class IgniteUtils { } /** + * Check that node Ignite product version is not less then specified. + * + * @param ver Target Ignite product version. + * @param nodes Cluster nodes. + * @return {@code True} if ignite product version of all nodes is not less then {@code ver}. + */ + public static boolean isOldestNodeVersionAtLeast(IgniteProductVersion ver, Iterable<ClusterNode> nodes) { + for (ClusterNode node : nodes) { + if (node.version().compareToIgnoreTimestamp(ver) < 0) + return false; + } + + return true; + } + + /** * @param addr pointer in memory * @param len how much byte to read (should divide 8) * http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java index 837cc3a..9a707eb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import junit.framework.AssertionFailedError; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -804,9 +805,20 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr GridCacheContext cctx = GridTestUtils.getFieldValue(set0, "cctx"); + boolean separated = separated(set0); + + if (separated) + awaitPartitionMapExchange(); + for (int i = 0; i < gridCount(); i++) { GridCacheAdapter cache = grid(i).context().cache().internalCache(cctx.name()); + if (separated) { + assertNull("Cache " + cctx.name() + " was not destroyed.", cache); + + continue; + } + for (Object e : cache.localEntries(new CachePeekMode[]{CachePeekMode.ALL})) { cnt++; @@ -1003,14 +1015,26 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr } /** - * Test that sets within the same group and compatible configurations are stored in the same cache. - * - * @throws Exception If failed. + * Test that non collocated sets are stored in a separated cache. + */ + public void testCacheReuse() { + testCacheReuse(false); + } + + /** + * Test that collocated sets within the same group and compatible configurations are stored in the same cache. + */ + public void testCacheReuseCollocated() { + testCacheReuse(true); + } + + /** + * @param collocated Collocation flag. */ - public void testCacheReuse() throws Exception { + private void testCacheReuse(boolean collocated) { Ignite ignite = grid(0); - CollectionConfiguration colCfg = collectionConfiguration(); + CollectionConfiguration colCfg = collectionConfiguration().setCollocated(collocated); colCfg.setAtomicityMode(ATOMIC); colCfg.setGroupName("grp1"); @@ -1018,24 +1042,28 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr IgniteSet set1 = ignite.set("set1", colCfg); IgniteSet set2 = ignite.set("set2", colCfg); - assert cctx(set1).cacheId() == cctx(set2).cacheId(); + assertEquals(separated(set1), cctx(set1).cacheId() != cctx(set2).cacheId()); colCfg.setAtomicityMode(TRANSACTIONAL); IgniteSet set3 = ignite.set("set3", colCfg); IgniteSet set4 = ignite.set("set4", colCfg); - assert cctx(set3).cacheId() == cctx(set4).cacheId(); - assert cctx(set1).cacheId() != cctx(set3).cacheId(); - assert cctx(set1).groupId() == cctx(set3).groupId(); + assertEquals(separated(set3), cctx(set3).cacheId() != cctx(set4).cacheId()); + + assertTrue(cctx(set1).cacheId() != cctx(set3).cacheId()); + assertTrue(cctx(set1).groupId() == cctx(set3).groupId()); colCfg.setGroupName("gtp2"); IgniteSet set5 = ignite.set("set5", colCfg); IgniteSet set6 = ignite.set("set6", colCfg); - assert cctx(set5).cacheId() == cctx(set6).cacheId(); - assert cctx(set1).groupId() != cctx(set5).groupId(); + assertEquals(separated(set5), cctx(set5).cacheId() != cctx(set6).cacheId()); + + assertTrue(cctx(set1).groupId() != cctx(set5).groupId()); + + Stream.of(set1, set2, set3, set4, set5, set6).forEach(IgniteSet::close); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java index a9abfd3..f7e12dd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java @@ -145,4 +145,15 @@ public abstract class IgniteCollectionAbstractTest extends GridCommonAbstractTes else return GridTestUtils.getFieldValue(set, GridCacheSetImpl.class, "ctx"); } + + /** + * @param set Ignite set. + * @return {@code True} If a separated cache is used to store items. + */ + protected boolean separated(IgniteSet set) { + if (set instanceof GridCacheSetProxy) + set = ((GridCacheSetProxy)set).delegate(); + + return GridTestUtils.getFieldValue(set, "separated"); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java index 4ae50c1..2b99a91 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java @@ -21,7 +21,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteSet; +import org.apache.ignite.IgniteQueue; import org.apache.ignite.configuration.CollectionConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; @@ -81,13 +81,13 @@ public class IgniteDataStructureWithJobTest extends GridCommonAbstractTest { while (System.currentTimeMillis() < endTime) { try { - ignite.compute().broadcast(new IgniteClosure<IgniteSet, Integer>() { - @Override public Integer apply(IgniteSet set) { - assertNotNull(set); + ignite.compute().broadcast(new IgniteClosure<IgniteQueue, Integer>() { + @Override public Integer apply(IgniteQueue queue) { + assertNotNull(queue); return 1; } - }, ignite.set("set", new CollectionConfiguration())); + }, ignite.queue("queue", 0, new CollectionConfiguration())); } catch (IgniteException ignore) { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java index 40a8952..2c6d187 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java @@ -173,11 +173,29 @@ public class IgniteAtomicLongChangingTopologySelfTest extends GridCommonAbstract * @throws Exception If failed. */ public void testClientSetCreateCloseFailover() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-9015"); + + checkClientSetCreateCloseFailover(false); + } + + /** + * @throws Exception If failed. + */ + public void testClientCollocatedSetCreateCloseFailover() throws Exception { + checkClientSetCreateCloseFailover(true); + } + + /** + * @param collocated Collocated flag. + * @throws Exception If failed. + */ + private void checkClientSetCreateCloseFailover(boolean collocated) throws Exception { testFailoverWithClient(new IgniteInClosure<Ignite>() { @Override public void apply(Ignite ignite) { for (int i = 0; i < 100; i++) { CollectionConfiguration colCfg = new CollectionConfiguration(); + colCfg.setCollocated(collocated); colCfg.setBackups(1); colCfg.setCacheMode(PARTITIONED); colCfg.setAtomicityMode(i % 2 == 0 ? TRANSACTIONAL : ATOMIC); http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java index 963c1d9..61a076e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java @@ -57,6 +57,8 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.http.GridEmbeddedHttpServer; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -848,6 +850,33 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest { } /** + * + */ + public void testIsOldestNodeVersionAtLeast() { + IgniteProductVersion v240 = IgniteProductVersion.fromString("2.4.0"); + IgniteProductVersion v241 = IgniteProductVersion.fromString("2.4.1"); + IgniteProductVersion v250 = IgniteProductVersion.fromString("2.5.0"); + IgniteProductVersion v250ts = IgniteProductVersion.fromString("2.5.0-b1-3"); + + TcpDiscoveryNode node240 = new TcpDiscoveryNode(); + node240.version(v240); + + TcpDiscoveryNode node241 = new TcpDiscoveryNode(); + node241.version(v241); + + TcpDiscoveryNode node250 = new TcpDiscoveryNode(); + node250.version(v250); + + TcpDiscoveryNode node250ts = new TcpDiscoveryNode(); + node250ts.version(v250ts); + + assertTrue(U.isOldestNodeVersionAtLeast(v240, Arrays.asList(node240, node241, node250, node250ts))); + assertFalse(U.isOldestNodeVersionAtLeast(v241, Arrays.asList(node240, node241, node250, node250ts))); + assertTrue(U.isOldestNodeVersionAtLeast(v250, Arrays.asList(node250, node250ts))); + assertTrue(U.isOldestNodeVersionAtLeast(v250ts, Arrays.asList(node250, node250ts))); + } + + /** * Test enum. */ private enum TestEnum {