Repository: ignite
Updated Branches:
  refs/heads/master 0ee363f53 -> 46d72fcde


IGNITE-7823 Separate cache for non collocated IgniteSet.

Signed-off-by: Anton Vinogradov <a...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46d72fcd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46d72fcd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46d72fcd

Branch: refs/heads/master
Commit: 46d72fcde83abacb55828b001023e31392b28621
Parents: 0ee363f
Author: pereslegin-pa <xxt...@gmail.com>
Authored: Wed Jul 25 13:55:55 2018 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Wed Jul 25 13:55:55 2018 +0300

----------------------------------------------------------------------
 .../CacheDataStructuresManager.java             |  27 ++--
 .../datastructures/DataStructuresProcessor.java |  55 ++++++--
 .../datastructures/GridCacheSetHeader.java      |  31 ++++-
 .../datastructures/GridCacheSetImpl.java        | 128 ++++++++++++-------
 .../datastructures/GridCacheSetItemKey.java     |   5 +-
 .../datastructures/GridCacheSetProxy.java       |  18 +++
 .../ignite/internal/util/IgniteUtils.java       |  16 +++
 .../GridCacheSetAbstractSelfTest.java           |  50 ++++++--
 .../IgniteCollectionAbstractTest.java           |  11 ++
 .../IgniteDataStructureWithJobTest.java         |  10 +-
 ...gniteAtomicLongChangingTopologySelfTest.java |  18 +++
 .../internal/util/IgniteUtilsSelfTest.java      |  29 +++++
 12 files changed, 309 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 4771582..ccfdc15 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -72,7 +72,6 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
 
 /**
@@ -237,10 +236,7 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
     {
         waitInitialization();
 
-        // Non collocated mode enabled only for PARTITIONED cache.
-        final boolean colloc0 = create && 
(cctx.cache().configuration().getCacheMode() != PARTITIONED || colloc);
-
-        return queue0(name, cap, colloc0, create);
+        return queue0(name, cap, colloc, create);
     }
 
     /**
@@ -389,32 +385,31 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
      * @param name Set name.
      * @param colloc Collocated flag.
      * @param create Create flag.
+     * @param separated Separated cache flag.
      * @return Set.
      * @throws IgniteCheckedException If failed.
      */
     @Nullable public <T> IgniteSet<T> set(final String name,
         boolean colloc,
-        final boolean create)
-        throws IgniteCheckedException
+        boolean create,
+        boolean separated) throws IgniteCheckedException
     {
-        // Non collocated mode enabled only for PARTITIONED cache.
-        final boolean colloc0 =
-            create && (cctx.cache().configuration().getCacheMode() != 
PARTITIONED || colloc);
-
-        return set0(name, colloc0, create);
+        return set0(name, colloc, create, separated);
     }
 
     /**
      * @param name Name of set.
      * @param collocated Collocation flag.
      * @param create If {@code true} set will be created in case it is not in 
cache.
+     * @param separated Separated cache flag.
      * @return Set.
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
     @Nullable private <T> IgniteSet<T> set0(String name,
         boolean collocated,
-        boolean create)
+        boolean create,
+        boolean separated)
         throws IgniteCheckedException
     {
         cctx.gate().enter();
@@ -427,7 +422,7 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
             IgniteInternalCache cache = cctx.cache().withNoRetries();
 
             if (create) {
-                hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), 
collocated);
+                hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), 
collocated, separated);
 
                 GridCacheSetHeader old = 
(GridCacheSetHeader)cache.getAndPutIfAbsent(key, hdr);
 
@@ -612,6 +607,10 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
      * @param rmv {@code True} if item was removed.
      */
     private void onSetItemUpdated(SetItemKey key, boolean rmv) {
+        // Items stored in a separate cache don't have identifier.
+        if (key.setId() == null)
+            return;
+
         GridConcurrentHashSet<SetItemKey> set = setDataMap.get(key.setId());
 
         if (set == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 2149ff1..8f6876c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.GPR;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -115,6 +116,10 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter implemen
     /** Atomics system cache name. */
     public static final String ATOMICS_CACHE_NAME = "ignite-sys-atomic-cache";
 
+    /** Non collocated IgniteSet will use separate cache if all nodes in 
cluster is not older then specified version. */
+    private static final IgniteProductVersion 
SEPARATE_CACHE_PER_NON_COLLOCATED_SET_SINCE =
+        IgniteProductVersion.fromString("2.7.0");
+
     /** Initial capacity. */
     private static final int INITIAL_CAPACITY = 10;
 
@@ -846,9 +851,20 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter implemen
 
         return getCollection(new IgniteClosureX<GridCacheContext, 
IgniteQueue<T>>() {
             @Override public IgniteQueue<T> applyx(GridCacheContext ctx) 
throws IgniteCheckedException {
-                return ctx.dataStructures().queue(name, cap0, create && 
cfg.isCollocated(), create);
+                return ctx.dataStructures().queue(name, cap0, 
isCollocated(cfg), create);
             }
-        }, cfg, name, grpName, QUEUE, create);
+        }, cfg, name, grpName, QUEUE, create, false);
+    }
+
+    /**
+     * Non-collocated mode only makes sense for and is only supported for 
PARTITIONED caches, so
+     * collocated mode should be enabled for non-partitioned cache by default.
+     *
+     * @param cfg Collection configuration.
+     * @return {@code True} If collocated mode should be enabled.
+     */
+    private boolean isCollocated(CollectionConfiguration cfg) {
+        return cfg != null && (cfg.isCollocated() || cfg.getCacheMode() != 
PARTITIONED);
     }
 
     /**
@@ -911,19 +927,33 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter implemen
     }
 
     /**
+     * Get compatible with collection configuration data structure cache.
+     *
      * @param cfg Collection configuration.
-     * @return Cache name.
      * @param grpName Group name.
+     * @param dsType Data structure type.
+     * @param dsName Data structure name.
+     * @param separated Separated cache flag.
+     * @return Data structure cache.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private IgniteInternalCache 
compatibleCache(CollectionConfiguration cfg, String grpName)
-        throws IgniteCheckedException
-    {
+    private IgniteInternalCache compatibleCache(CollectionConfiguration cfg,
+        String grpName,
+        DataStructureType dsType,
+        String dsName,
+        boolean separated
+    ) throws IgniteCheckedException {
         String cacheName = DS_CACHE_NAME_PREFIX + cfg.getAtomicityMode() + "_" 
+ cfg.getCacheMode() + "_" +
             cfg.getBackups() + "@" + grpName;
 
         IgniteInternalCache cache = ctx.cache().cache(cacheName);
 
+        if (separated && (cache == null || !cache.containsKey(new 
GridCacheSetHeaderKey(dsName)))) {
+            cacheName += "#" + dsType.name() + "_" + dsName;
+
+            cache = ctx.cache().cache(cacheName);
+        }
+
         if (cache == null) {
             ctx.cache().dynamicStartCache(cacheConfiguration(cfg, cacheName, 
grpName),
                 cacheName,
@@ -990,6 +1020,7 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter implemen
      * @param grpName Cache group name.
      * @param type Data structure type.
      * @param create Create flag.
+     * @param separated Separated cache flag.
      * @return Collection instance.
      * @throws IgniteCheckedException If failed.
      */
@@ -998,7 +1029,8 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter implemen
         String name,
         @Nullable String grpName,
         final DataStructureType type,
-        boolean create)
+        boolean create,
+        boolean separated)
         throws IgniteCheckedException
     {
         awaitInitialization();
@@ -1051,7 +1083,7 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter implemen
         final IgniteInternalCache cache;
 
         if (create) {
-            cache = compatibleCache(cfg, grpName);
+            cache = compatibleCache(cfg, grpName, type, name, separated);
 
             DistributedCollectionMetadata newVal = new 
DistributedCollectionMetadata(type, cfg, cache.name());
 
@@ -1521,12 +1553,15 @@ public final class DataStructuresProcessor extends 
GridProcessorAdapter implemen
         A.notNull(name, "name");
 
         final boolean create = cfg != null;
+        final boolean collocated = isCollocated(cfg);
+        final boolean separated = !collocated &&
+            
U.isOldestNodeVersionAtLeast(SEPARATE_CACHE_PER_NON_COLLOCATED_SET_SINCE,  
ctx.grid().cluster().nodes());
 
         return getCollection(new CX1<GridCacheContext, IgniteSet<T>>() {
             @Override public IgniteSet<T> applyx(GridCacheContext cctx) throws 
IgniteCheckedException {
-                return cctx.dataStructures().set(name, create && 
cfg.isCollocated(), create);
+                return cctx.dataStructures().set(name, collocated, create, 
separated);
             }
-        }, cfg, name, grpName, SET, create);
+        }, cfg, name, grpName, SET, create, separated);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java
index c650b21..5693bda 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetHeader.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.datastructures;
 
+import java.io.EOFException;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -33,12 +34,15 @@ public class GridCacheSetHeader implements 
GridCacheInternal, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
+    /** Set unique ID. */
     private IgniteUuid id;
 
-    /** */
+    /** Collocation flag. */
     private boolean collocated;
 
+    /** Separated cache flag. */
+    private boolean separated;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -49,10 +53,14 @@ public class GridCacheSetHeader implements 
GridCacheInternal, Externalizable {
     /**
      * @param id Set UUID.
      * @param collocated Collocation flag.
+     * @param separated Separated cache flag.
      */
-    public GridCacheSetHeader(IgniteUuid id, boolean collocated) {
+    public GridCacheSetHeader(IgniteUuid id, boolean collocated, boolean 
separated) {
+        assert !(separated && collocated);
+
         this.id = id;
         this.collocated = collocated;
+        this.separated = separated;
     }
 
     /**
@@ -69,16 +77,31 @@ public class GridCacheSetHeader implements 
GridCacheInternal, Externalizable {
         return collocated;
     }
 
+    /**
+     * @return Separated cache flag.
+     */
+    public boolean separated() {
+        return separated;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeGridUuid(out, id);
         out.writeBoolean(collocated);
+        out.writeBoolean(separated);
     }
 
     /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+    @Override public void readExternal(ObjectInput in) throws IOException {
         id = U.readGridUuid(in);
         collocated = in.readBoolean();
+
+        try {
+            separated = in.readBoolean();
+        }
+        catch (EOFException ignore) {
+            // Ignore exception for backward compatibility, since header may 
not contain a "separated" flag.
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 0e3e102..ba65d9e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -35,10 +35,11 @@ import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSet;
+import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
-import 
org.apache.ignite.internal.processors.cache.CacheWeakQueryIteratorsHolder;
+import 
org.apache.ignite.internal.processors.cache.CacheWeakQueryIteratorsHolder.WeakReferenceCloseableIterator;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
@@ -50,6 +51,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -83,18 +85,18 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
     /** Collocation flag. */
     private final boolean collocated;
 
+    /** Separated cache flag. */
+    private final boolean separated;
+
     /** Set header partition. */
     private final int hdrPart;
 
     /** Set header key. */
-    protected final GridCacheSetHeaderKey setKey;
+    private final GridCacheSetHeaderKey setKey;
 
     /** Removed flag. */
     private volatile boolean rmvd;
 
-    /** */
-    private final boolean binaryMarsh;
-
     /** Access to affinityRun() and affinityCall() functions. */
     private final IgniteCompute compute;
 
@@ -107,18 +109,14 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
     public GridCacheSetImpl(GridCacheContext ctx, String name, 
GridCacheSetHeader hdr) {
         this.ctx = ctx;
         this.name = name;
-        id = hdr.id();
-        collocated = hdr.collocated();
-        binaryMarsh = ctx.binaryMarshaller();
-        compute = ctx.kernalContext().grid().compute();
-
-        cache = ctx.cache();
-
-        setKey = new GridCacheSetHeaderKey(name);
-
-        log = ctx.logger(GridCacheSetImpl.class);
-
-        hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));
+        this.collocated = hdr.collocated();
+        this.id = hdr.id();
+        this.compute = ctx.kernalContext().grid().compute();
+        this.cache = ctx.cache();
+        this.setKey = new GridCacheSetHeaderKey(name);
+        this.log = ctx.logger(GridCacheSetImpl.class);
+        this.hdrPart = ctx.affinity().partition(setKey);
+        this.separated = hdr.separated();
     }
 
     /** {@inheritDoc} */
@@ -141,7 +139,7 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    public boolean checkHeader() throws IgniteCheckedException {
+    boolean checkHeader() throws IgniteCheckedException {
         IgniteInternalCache<GridCacheSetHeaderKey, GridCacheSetHeader> cache0 
= ctx.cache();
 
         GridCacheSetHeader hdr = cache0.get(new GridCacheSetHeaderKey(name));
@@ -155,6 +153,11 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
         try {
             onAccess();
 
+            if (separated) {
+                // Non collocated IgniteSet uses a separate cache which 
contains additional header element.
+                return cache.sizeAsync(new CachePeekMode[] {}).get() - 1;
+            }
+
             if (ctx.isLocal() || ctx.isReplicated()) {
                 GridConcurrentHashSet<SetItemKey> set = 
ctx.dataStructures().setData(id);
 
@@ -378,7 +381,7 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
     }
 
     /** {@inheritDoc} */
-    public void affinityRun(IgniteRunnable job) {
+    @Override public void affinityRun(IgniteRunnable job) {
         if (!collocated)
             throw new IgniteException("Failed to execute affinityRun() for 
non-collocated set: " + name() +
                 ". This operation is supported only for collocated sets.");
@@ -387,7 +390,7 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
     }
 
     /** {@inheritDoc} */
-    public <R> R affinityCall(IgniteCallable<R> job) {
+    @Override public <R> R affinityCall(IgniteCallable<R> job) {
         if (!collocated)
             throw new IgniteException("Failed to execute affinityCall() for 
non-collocated set: " + name() +
                 ". This operation is supported only for collocated sets.");
@@ -408,29 +411,12 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
         }
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
+    /**
+     * @return Closeable iterator.
+     */
     private GridCloseableIterator<T> iterator0() {
         try {
-            CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
-                new GridSetQueryPredicate<>(id, collocated), null, false, 
false);
-
-            Collection<ClusterNode> nodes = 
dataNodes(ctx.affinity().affinityTopologyVersion());
-
-            qry.projection(ctx.grid().cluster().forNodes(nodes));
-
-            CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute();
-
-            CacheWeakQueryIteratorsHolder.WeakReferenceCloseableIterator it =
-                ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, 
Map.Entry<T, ?>>() {
-                    @Override protected T convert(Map.Entry<T, ?> e) {
-                        return e.getKey();
-                    }
-
-                    @Override protected void remove(T item) {
-                        GridCacheSetImpl.this.remove(item);
-                    }
-                });
+            WeakReferenceCloseableIterator<T> it = separated ? 
separatedCacheIterator() : sharedCacheIterator();
 
             if (rmvd) {
                 ctx.itHolder().removeIterator(it);
@@ -446,6 +432,54 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
     }
 
     /**
+     * @return Shared cache iterator.
+     */
+    @SuppressWarnings("unchecked")
+    private WeakReferenceCloseableIterator<T> sharedCacheIterator() throws 
IgniteCheckedException {
+        CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
+            new GridSetQueryPredicate<>(id, collocated), null, false, false);
+
+        Collection<ClusterNode> nodes = 
dataNodes(ctx.affinity().affinityTopologyVersion());
+
+        qry.projection(ctx.grid().cluster().forNodes(nodes));
+
+        CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute();
+
+        return ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, 
Map.Entry<T, ?>>() {
+            @Override protected T convert(Map.Entry<T, ?> e) {
+                return e.getKey();
+            }
+
+            @Override protected void remove(T item) {
+                GridCacheSetImpl.this.remove(item);
+            }
+        });
+    }
+
+    /**
+     * @return Separated cache iterator.
+     */
+    @SuppressWarnings("unchecked")
+    private WeakReferenceCloseableIterator<T> separatedCacheIterator() throws 
IgniteCheckedException {
+        GridCloseableIterator iter =
+            (GridCloseableIterator)cache.scanIterator(false, new 
IgniteBiPredicate<Object, Object>() {
+                @Override public boolean apply(Object k, Object v) {
+                    return k.getClass() == GridCacheSetItemKey.class;
+                }
+            });
+
+        return ctx.itHolder().iterator(iter, new CacheIteratorConverter<T, 
Map.Entry<T, ?>>() {
+            @Override protected T convert(Map.Entry<T, ?> e) {
+                return (T)((SetItemKey)e.getKey()).item();
+            }
+
+            @Override protected void remove(T item) {
+                GridCacheSetImpl.this.remove(item);
+            }
+        });
+    }
+
+    /**
      * @param call Callable.
      * @return Callable result.
      */
@@ -529,7 +563,7 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
      */
     private void checkRemoved() {
         if (rmvd)
-            throw new IllegalStateException("Set has been removed from cache: 
" + this);
+            throw new IllegalStateException("Set has been removed: " + this);
     }
 
     /**
@@ -556,11 +590,19 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements Ignite
     }
 
     /**
+     * @return {@code True} If a separated cache is used to store items.
+     */
+    boolean separated() {
+        return separated;
+    }
+
+    /**
      * @param item Set item.
      * @return Item key.
      */
     private SetItemKey itemKey(Object item) {
-        return collocated ? new CollocatedSetItemKey(name, id, item) : new 
GridCacheSetItemKey(id, item);
+        return collocated ? new CollocatedSetItemKey(name, id, item) :
+            new GridCacheSetItemKey(separated ? null : id, item);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
index 4280891..95bee8f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.Objects;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -68,7 +69,7 @@ public class GridCacheSetItemKey implements SetItemKey, 
Externalizable {
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        int res = setId.hashCode();
+        int res = setId == null ? 0 : setId.hashCode();
 
         res = 31 * res + item.hashCode();
 
@@ -85,7 +86,7 @@ public class GridCacheSetItemKey implements SetItemKey, 
Externalizable {
 
         GridCacheSetItemKey that = (GridCacheSetItemKey)o;
 
-        return setId.equals(that.setId) && item.equals(that.item);
+        return Objects.equals(this.setId, that.setId) && 
item.equals(that.item);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index 39d6f18..729f6eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -28,12 +28,16 @@ import java.util.Iterator;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheGateway;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.jetbrains.annotations.NotNull;
 
@@ -352,14 +356,28 @@ public class GridCacheSetProxy<T> implements 
IgniteSet<T>, Externalizable {
 
     /** {@inheritDoc} */
     @Override public void close() {
+        IgniteFuture<Boolean> destroyFut = null;
+
         gate.enter();
 
         try {
             delegate.close();
+
+            if (delegate.separated()) {
+                IgniteInternalFuture<Boolean> fut = 
cctx.kernalContext().cache().dynamicDestroyCache(
+                    cctx.cache().name(), false, true, false);
+
+                ((GridFutureAdapter)fut).ignoreInterrupts();
+
+                destroyFut = new IgniteFutureImpl<>(fut);
+            }
         }
         finally {
             gate.leave();
         }
+
+        if (destroyFut != null)
+            destroyFut.get();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index b336f91..3dfa8c1 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -10435,6 +10435,22 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Check that node Ignite product version is not less then specified.
+     *
+     * @param ver Target Ignite product version.
+     * @param nodes Cluster nodes.
+     * @return {@code True} if ignite product version of all nodes is not less 
then {@code ver}.
+     */
+    public static boolean isOldestNodeVersionAtLeast(IgniteProductVersion ver, 
Iterable<ClusterNode> nodes) {
+        for (ClusterNode node : nodes) {
+            if (node.version().compareToIgnoreTimestamp(ver) < 0)
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param addr pointer in memory
      * @param len how much byte to read (should divide 8)
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
index 837cc3a..9a707eb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 import junit.framework.AssertionFailedError;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -804,9 +805,20 @@ public abstract class GridCacheSetAbstractSelfTest extends 
IgniteCollectionAbstr
 
         GridCacheContext cctx = GridTestUtils.getFieldValue(set0, "cctx");
 
+        boolean separated = separated(set0);
+
+        if (separated)
+            awaitPartitionMapExchange();
+
         for (int i = 0; i < gridCount(); i++) {
             GridCacheAdapter cache = 
grid(i).context().cache().internalCache(cctx.name());
 
+            if (separated) {
+                assertNull("Cache " + cctx.name() + " was not destroyed.", 
cache);
+
+                continue;
+            }
+
             for (Object e : cache.localEntries(new 
CachePeekMode[]{CachePeekMode.ALL})) {
                 cnt++;
 
@@ -1003,14 +1015,26 @@ public abstract class GridCacheSetAbstractSelfTest 
extends IgniteCollectionAbstr
     }
 
     /**
-     * Test that sets within the same group and compatible configurations are 
stored in the same cache.
-     *
-     * @throws Exception If failed.
+     * Test that non collocated sets are stored in a separated cache.
+     */
+    public void testCacheReuse()  {
+        testCacheReuse(false);
+    }
+
+    /**
+     * Test that collocated sets within the same group and compatible 
configurations are stored in the same cache.
+     */
+    public void testCacheReuseCollocated() {
+        testCacheReuse(true);
+    }
+
+    /**
+     * @param collocated Collocation flag.
      */
-    public void testCacheReuse() throws Exception {
+    private void testCacheReuse(boolean collocated) {
         Ignite ignite = grid(0);
 
-        CollectionConfiguration colCfg = collectionConfiguration();
+        CollectionConfiguration colCfg = 
collectionConfiguration().setCollocated(collocated);
 
         colCfg.setAtomicityMode(ATOMIC);
         colCfg.setGroupName("grp1");
@@ -1018,24 +1042,28 @@ public abstract class GridCacheSetAbstractSelfTest 
extends IgniteCollectionAbstr
         IgniteSet set1 = ignite.set("set1", colCfg);
         IgniteSet set2 = ignite.set("set2", colCfg);
 
-        assert cctx(set1).cacheId() == cctx(set2).cacheId();
+        assertEquals(separated(set1), cctx(set1).cacheId() != 
cctx(set2).cacheId());
 
         colCfg.setAtomicityMode(TRANSACTIONAL);
 
         IgniteSet set3 = ignite.set("set3", colCfg);
         IgniteSet set4 = ignite.set("set4", colCfg);
 
-        assert cctx(set3).cacheId() == cctx(set4).cacheId();
-        assert cctx(set1).cacheId() != cctx(set3).cacheId();
-        assert cctx(set1).groupId() == cctx(set3).groupId();
+        assertEquals(separated(set3), cctx(set3).cacheId() != 
cctx(set4).cacheId());
+
+        assertTrue(cctx(set1).cacheId() != cctx(set3).cacheId());
+        assertTrue(cctx(set1).groupId() == cctx(set3).groupId());
 
         colCfg.setGroupName("gtp2");
 
         IgniteSet set5 = ignite.set("set5", colCfg);
         IgniteSet set6 = ignite.set("set6", colCfg);
 
-        assert cctx(set5).cacheId() == cctx(set6).cacheId();
-        assert cctx(set1).groupId() != cctx(set5).groupId();
+        assertEquals(separated(set5), cctx(set5).cacheId() != 
cctx(set6).cacheId());
+
+        assertTrue(cctx(set1).groupId() != cctx(set5).groupId());
+
+        Stream.of(set1, set2, set3, set4, set5, 
set6).forEach(IgniteSet::close);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
index a9abfd3..f7e12dd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
@@ -145,4 +145,15 @@ public abstract class IgniteCollectionAbstractTest extends 
GridCommonAbstractTes
         else
             return GridTestUtils.getFieldValue(set, GridCacheSetImpl.class, 
"ctx");
     }
+
+    /**
+     * @param set Ignite set.
+     * @return {@code True} If a separated cache is used to store items.
+     */
+    protected boolean separated(IgniteSet set) {
+        if (set instanceof GridCacheSetProxy)
+            set = ((GridCacheSetProxy)set).delegate();
+
+        return GridTestUtils.getFieldValue(set, "separated");
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java
index 4ae50c1..2b99a91 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureWithJobTest.java
@@ -21,7 +21,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -81,13 +81,13 @@ public class IgniteDataStructureWithJobTest extends 
GridCommonAbstractTest {
 
             while (System.currentTimeMillis() < endTime) {
                 try {
-                    ignite.compute().broadcast(new IgniteClosure<IgniteSet, 
Integer>() {
-                        @Override public Integer apply(IgniteSet set) {
-                            assertNotNull(set);
+                    ignite.compute().broadcast(new IgniteClosure<IgniteQueue, 
Integer>() {
+                        @Override public Integer apply(IgniteQueue queue) {
+                            assertNotNull(queue);
 
                             return 1;
                         }
-                    }, ignite.set("set", new CollectionConfiguration()));
+                    }, ignite.queue("queue", 0, new 
CollectionConfiguration()));
                 }
                 catch (IgniteException ignore) {
                     // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
index 40a8952..2c6d187 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteAtomicLongChangingTopologySelfTest.java
@@ -173,11 +173,29 @@ public class IgniteAtomicLongChangingTopologySelfTest 
extends GridCommonAbstract
      * @throws Exception If failed.
      */
     public void testClientSetCreateCloseFailover() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-9015";);
+
+        checkClientSetCreateCloseFailover(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientCollocatedSetCreateCloseFailover() throws Exception {
+        checkClientSetCreateCloseFailover(true);
+    }
+
+    /**
+     * @param collocated Collocated flag.
+     * @throws Exception If failed.
+     */
+    private void checkClientSetCreateCloseFailover(boolean collocated) throws 
Exception {
         testFailoverWithClient(new IgniteInClosure<Ignite>() {
             @Override public void apply(Ignite ignite) {
                 for (int i = 0; i < 100; i++) {
                     CollectionConfiguration colCfg = new 
CollectionConfiguration();
 
+                    colCfg.setCollocated(collocated);
                     colCfg.setBackups(1);
                     colCfg.setCacheMode(PARTITIONED);
                     colCfg.setAtomicityMode(i % 2 == 0 ? TRANSACTIONAL : 
ATOMIC);

http://git-wip-us.apache.org/repos/asf/ignite/blob/46d72fcd/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 963c1d9..61a076e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -57,6 +57,8 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.http.GridEmbeddedHttpServer;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -848,6 +850,33 @@ public class IgniteUtilsSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    public void testIsOldestNodeVersionAtLeast() {
+        IgniteProductVersion v240 = IgniteProductVersion.fromString("2.4.0");
+        IgniteProductVersion v241 = IgniteProductVersion.fromString("2.4.1");
+        IgniteProductVersion v250 = IgniteProductVersion.fromString("2.5.0");
+        IgniteProductVersion v250ts = 
IgniteProductVersion.fromString("2.5.0-b1-3");
+
+        TcpDiscoveryNode node240 = new TcpDiscoveryNode();
+        node240.version(v240);
+
+        TcpDiscoveryNode node241 = new TcpDiscoveryNode();
+        node241.version(v241);
+
+        TcpDiscoveryNode node250 = new TcpDiscoveryNode();
+        node250.version(v250);
+
+        TcpDiscoveryNode node250ts = new TcpDiscoveryNode();
+        node250ts.version(v250ts);
+
+        assertTrue(U.isOldestNodeVersionAtLeast(v240, Arrays.asList(node240, 
node241, node250, node250ts)));
+        assertFalse(U.isOldestNodeVersionAtLeast(v241, Arrays.asList(node240, 
node241, node250, node250ts)));
+        assertTrue(U.isOldestNodeVersionAtLeast(v250, Arrays.asList(node250, 
node250ts)));
+        assertTrue(U.isOldestNodeVersionAtLeast(v250ts, Arrays.asList(node250, 
node250ts)));
+    }
+
+    /**
      * Test enum.
      */
     private enum TestEnum {

Reply via email to