This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bfa2a0fe8 IGNITE-20503 Sql. Support big clusters by mapping service 
(#3966)
7bfa2a0fe8 is described below

commit 7bfa2a0fe812f67c0a14a5e5110836e373f2cd60
Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com>
AuthorDate: Thu Jun 27 16:45:56 2024 +0300

    IGNITE-20503 Sql. Support big clusters by mapping service (#3966)
---
 .../sql/engine/exec/mapping/ExecutionTarget.java   |  17 +-
 .../sql/engine/exec/mapping/FragmentMapper.java    |   6 +-
 .../sql/engine/exec/mapping/MappingContext.java    |   7 +-
 .../AbstractTarget.java                            | 151 +++++++----
 .../AllOfTarget.java                               |  10 +-
 .../LargeClusterFactory.java}                      |  56 ++--
 .../OneOfTarget.java                               |  30 +--
 .../PartitionedTarget.java                         |  46 ++--
 .../SomeOfTarget.java                              |  25 +-
 .../exec/mapping/smallcluster/AbstractTarget.java  |  42 ++-
 .../exec/mapping/smallcluster/AllOfTarget.java     |   5 -
 .../exec/mapping/smallcluster/OneOfTarget.java     |   8 +-
 .../mapping/smallcluster/PartitionedTarget.java    |   7 +-
 .../mapping/smallcluster/SmallClusterFactory.java  |  19 +-
 .../exec/mapping/smallcluster/SomeOfTarget.java    |   5 -
 .../sql/engine/benchmarks/MappingBenchmark.java    | 180 +++++++++++++
 .../mapping/ExecutionTargetFactorySelfTest.java    | 284 +++++++++++++++++++++
 17 files changed, 700 insertions(+), 198 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
index 47f629fc2c..b988eace26 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTarget.java
@@ -35,9 +35,7 @@ public interface ExecutionTarget {
      * <p>Colocation is a process of finding intersection of the given two 
targets. For example,
      * lets assume that we have two targets T1 and T2. T1 may be execute on 
one of the nodes
      * [N1, N2, N3]. T2 may be executed on one of the nodes [N2, N3, N4, N5]. 
The result of
-     * colocation of T1 and T2 will be target OneOf[N2, N3]. Please note, that 
result of this
-     * example requires finalisation, since we've got two nodes [N2, N3], but 
target should
-     * be executed on only one of them.
+     * colocation of T1 and T2 will be target OneOf[N2, N3].
      *
      * @param other A target to colocate with.
      * @return A colocated target.
@@ -47,22 +45,11 @@ public interface ExecutionTarget {
 
     /**
      * Removes options from current target which are not colocated with other 
target.
-     * 
+     *
      * <p>If target has several options, remove those are not presented in 
given target to improve colocation.
      *
      * @param other Target with which we need to colocate current target.
      * @return Returns new target in case current has been adjusted, return 
{@code this} instance otherwise.
      */
     ExecutionTarget trimTo(ExecutionTarget other);
-
-    /**
-     * Finalises target by choosing exactly one node for targets with multiple 
options.
-     *
-     * <p>Some targets may have several options, so we have to pick one in 
order to get
-     * correct results. Call to this methods resolves this ambiguity by 
truncating all
-     * but one option. Which exactly option will be left is implementation 
defined.
-     *
-     * @return Finalised target.
-     */
-    ExecutionTarget finalise();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index 271a7055bb..d252b7a118 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -804,10 +804,8 @@ class FragmentMapper {
 
         @Override
         public List<ColocationGroup> createColocationGroups() {
-            ExecutionTarget finalised = target.finalise();
-
-            List<String> nodes = 
context.targetFactory().resolveNodes(finalised);
-            Int2ObjectMap<NodeWithConsistencyToken> assignments = 
context.targetFactory().resolveAssignments(finalised);
+            List<String> nodes = context.targetFactory().resolveNodes(target);
+            Int2ObjectMap<NodeWithConsistencyToken> assignments = 
context.targetFactory().resolveAssignments(target);
 
             return List.of(
                     new ColocationGroup(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
index 4d559694aa..b230df95ab 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 
@@ -37,11 +38,7 @@ class MappingContext {
         this.localNode = localNode;
         this.nodes = nodes;
 
-        if (nodes.size() > 64) {
-            throw new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-20503";);
-        }
-
-        this.targetFactory = new SmallClusterFactory(nodes);
+        this.targetFactory = nodes.size() > 64 ? new 
LargeClusterFactory(nodes) : new SmallClusterFactory(nodes);
     }
 
     public RelOptCluster cluster() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
similarity index 57%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
index 293d697774..6d8c14c32e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
-
-import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
 import java.util.List;
+import org.apache.calcite.util.BitSets;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -35,25 +36,30 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
  * colocation method.
  */
 abstract class AbstractTarget implements ExecutionTarget {
-    final long nodes;
+    final BitSet nodes;
+
+    AbstractTarget(BitSet nodes) {
+        assert !nodes.isEmpty();
 
-    AbstractTarget(long nodes) {
         this.nodes = nodes;
     }
 
     List<String> nodes(List<String> nodeNames) {
-        if (isPow2(nodes)) {
-            int idx = Long.numberOfTrailingZeros(nodes);
+        int cardinality = nodes.cardinality();
+
+        if (cardinality == 1) {
+            int idx = nodes.nextSetBit(0);
 
             return List.of(nodeNames.get(idx));
         }
 
-        int count = Long.bitCount(nodes);
-        List<String> result = new ArrayList<>(count);
+        List<String> result = new ArrayList<>(cardinality);
 
-        for (int bit = 1, idx = 0; bit <= nodes; bit <<= 1, idx++) {
-            if ((nodes & bit) != 0) {
-                result.add(nodeNames.get(idx));
+        for (int idx = nodes.nextSetBit(0); idx >= 0; idx = 
nodes.nextSetBit(idx + 1)) {
+            result.add(nodeNames.get(idx));
+
+            if (idx == Integer.MAX_VALUE) {
+                break;  // or (i+1) would overflow
             }
         }
 
@@ -70,11 +76,11 @@ abstract class AbstractTarget implements ExecutionTarget {
         Int2ObjectMap<NodeWithConsistencyToken> result = new 
Int2ObjectOpenHashMap<>(partitionedTarget.partitionsNodes.length);
 
         for (int partNo = 0; partNo < 
partitionedTarget.partitionsNodes.length; partNo++) {
-            long partitionNodes = partitionedTarget.partitionsNodes[partNo];
+            BitSet partitionNodes = partitionedTarget.partitionsNodes[partNo];
 
-            assert isPow2(partitionNodes);
+            assert partitionNodes.cardinality() == 1;
 
-            int idx = Long.numberOfTrailingZeros(partitionNodes);
+            int idx = partitionNodes.nextSetBit(0);
 
             result.put(partNo, new NodeWithConsistencyToken(
                     nodeNames.get(idx),
@@ -85,7 +91,16 @@ abstract class AbstractTarget implements ExecutionTarget {
         return result;
     }
 
-    abstract boolean finalised();
+    /**
+     * Finalises target by choosing exactly one node for targets with multiple 
options.
+     *
+     * <p>Some targets may have several options, so we have to pick one in 
order to get
+     * correct results. Call to this methods resolves this ambiguity by 
truncating all
+     * but one option. Which exactly option will be left is implementation 
defined.
+     *
+     * @return Finalised target.
+     */
+    abstract ExecutionTarget finalise();
 
     abstract ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException;
 
@@ -96,7 +111,7 @@ abstract class AbstractTarget implements ExecutionTarget {
     abstract ExecutionTarget colocate(SomeOfTarget other) throws 
ColocationMappingException;
 
     static ExecutionTarget colocate(AllOfTarget allOf, AllOfTarget otherAllOf) 
throws ColocationMappingException {
-        if (allOf.nodes != otherAllOf.nodes) {
+        if (!allOf.nodes.equals(otherAllOf.nodes) || 
otherAllOf.nodes.cardinality() == 0) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
@@ -104,11 +119,10 @@ abstract class AbstractTarget implements ExecutionTarget {
     }
 
     static ExecutionTarget colocate(AllOfTarget allOf, OneOfTarget oneOf) 
throws ColocationMappingException {
-        if ((allOf.nodes & oneOf.nodes) == 0) {
-            throw new ColocationMappingException("Targets are not colocated");
-        }
+        int target = allOf.nodes.nextSetBit(0);
 
-        if (!isPow2(allOf.nodes)) {
+        // When colocated, AllOfTarget must contains a single node that 
matches one of OneOfTarget nodes.
+        if (target == -1 || allOf.nodes.nextSetBit(target + 1) != -1 || 
!oneOf.nodes.get(target)) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
@@ -120,9 +134,7 @@ abstract class AbstractTarget implements ExecutionTarget {
     }
 
     static ExecutionTarget colocate(AllOfTarget allOf, SomeOfTarget someOf) 
throws ColocationMappingException {
-        long newNodes = allOf.nodes & someOf.nodes;
-
-        if (allOf.nodes != newNodes) {
+        if (!BitSets.contains(someOf.nodes, allOf.nodes) || 
allOf.nodes.isEmpty()) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
@@ -130,9 +142,10 @@ abstract class AbstractTarget implements ExecutionTarget {
     }
 
     static ExecutionTarget colocate(OneOfTarget oneOf, OneOfTarget 
anotherOneOf) throws ColocationMappingException {
-        long newNodes = oneOf.nodes & anotherOneOf.nodes;
+        BitSet newNodes = (BitSet) oneOf.nodes.clone();
+        newNodes.and(anotherOneOf.nodes);
 
-        if (newNodes == 0) {
+        if (newNodes.isEmpty()) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
@@ -140,24 +153,45 @@ abstract class AbstractTarget implements ExecutionTarget {
     }
 
     static ExecutionTarget colocate(OneOfTarget oneOf, PartitionedTarget 
partitioned) throws ColocationMappingException {
-        if ((oneOf.nodes & partitioned.nodes) == 0) {
-            throw new ColocationMappingException("Targets are not colocated");
+        if (partitioned.nodes.cardinality() == 1 && 
oneOf.nodes.get(partitioned.nodes.nextSetBit(0))) {
+            return partitioned; // All partitions on single node.
         }
 
-        if (!isPow2((partitioned.nodes))) {
-            throw new ColocationMappingException("Targets are not colocated");
+        boolean changed = false;
+        BitSet newNodes = (BitSet) oneOf.nodes.clone();
+        for (int partNo = 0; partNo < partitioned.partitionsNodes.length; 
partNo++) {
+            if (newNodes.equals(partitioned.partitionsNodes[partNo])) {
+                continue;
+            }
+
+            changed = true;
+
+            newNodes.and(partitioned.partitionsNodes[partNo]);
+
+            if (newNodes.isEmpty()) {
+                throw new ColocationMappingException("Targets are not 
colocated");
+            }
         }
 
-        return partitioned;
+        if (!changed) {
+            return partitioned;
+        }
+
+        BitSet[] newPartitionsNodes = new 
BitSet[partitioned.partitionsNodes.length];
+        Arrays.fill(newPartitionsNodes, newNodes);
+        boolean finalised = newNodes.cardinality() == 1;
+
+        return new PartitionedTarget(finalised, newPartitionsNodes, 
partitioned.enlistmentConsistencyTokens);
     }
 
     static ExecutionTarget colocate(OneOfTarget oneOf, SomeOfTarget someOf) 
throws ColocationMappingException {
-        long newNodes = oneOf.nodes & someOf.nodes;
-
-        if (newNodes == 0) {
+        if (!oneOf.nodes.intersects(someOf.nodes)) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
+        BitSet newNodes = (BitSet) oneOf.nodes.clone();
+        newNodes.and(someOf.nodes);
+
         return new OneOfTarget(newNodes);
     }
 
@@ -166,54 +200,71 @@ abstract class AbstractTarget implements ExecutionTarget {
             throw new ColocationMappingException("Partitioned targets with mot 
matching numbers of partitioned are not colocated");
         }
 
+        boolean changed = false;
         boolean finalised = true;
-        long[] newPartitionsNodes = new 
long[partitioned.partitionsNodes.length];
+        BitSet[] newPartitionsNodes = new 
BitSet[partitioned.partitionsNodes.length];
         for (int partNo = 0; partNo < partitioned.partitionsNodes.length; 
partNo++) {
-            long newNodes = partitioned.partitionsNodes[partNo] & 
otherPartitioned.partitionsNodes[partNo];
+            if 
(partitioned.partitionsNodes[partNo].equals(otherPartitioned.partitionsNodes[partNo]))
 {
+                newPartitionsNodes[partNo] = 
partitioned.partitionsNodes[partNo];
 
-            if (newNodes == 0) {
-                throw new ColocationMappingException("Targets are not 
colocated");
+                continue;
             }
 
-            if (partitioned.enlistmentConsistencyTokens[partNo] != 
otherPartitioned.enlistmentConsistencyTokens[partNo]) {
-                throw new ColocationMappingException("Partitioned targets have 
different terms");
+            changed = true;
+            BitSet newNodes = (BitSet) 
partitioned.partitionsNodes[partNo].clone();
+            newNodes.and(otherPartitioned.partitionsNodes[partNo]);
+
+            if (newNodes.isEmpty()) {
+                throw new ColocationMappingException("Targets are not 
colocated");
             }
 
             newPartitionsNodes[partNo] = newNodes;
-            finalised = finalised && isPow2(newNodes);
+            finalised = finalised && newNodes.cardinality() == 1;
         }
 
-        return new PartitionedTarget(finalised, newPartitionsNodes, 
partitioned.enlistmentConsistencyTokens);
+        if (!Arrays.equals(partitioned.enlistmentConsistencyTokens, 
otherPartitioned.enlistmentConsistencyTokens)) {
+            throw new ColocationMappingException("Partitioned targets have 
different terms");
+        }
+
+        if (changed) {
+            return new PartitionedTarget(finalised, newPartitionsNodes, 
partitioned.enlistmentConsistencyTokens);
+        }
+
+        return partitioned;
     }
 
     static ExecutionTarget colocate(PartitionedTarget partitioned, 
SomeOfTarget someOf) throws ColocationMappingException {
         boolean finalised = true;
-        long[] newPartitionsNodes = new 
long[partitioned.partitionsNodes.length];
+        BitSet[] newPartitionsNodes = new 
BitSet[partitioned.partitionsNodes.length];
         for (int partNo = 0; partNo < partitioned.partitionsNodes.length; 
partNo++) {
-            long newNodes = partitioned.partitionsNodes[partNo] & someOf.nodes;
+            BitSet newNodes = (BitSet) 
partitioned.partitionsNodes[partNo].clone();
+            newNodes.and(someOf.nodes);
 
-            if (newNodes == 0) {
+            if (newNodes.isEmpty()) {
                 throw new ColocationMappingException("Targets are not 
colocated");
             }
 
             newPartitionsNodes[partNo] = newNodes;
-            finalised = finalised && isPow2(newNodes);
+            finalised = finalised && newNodes.cardinality() == 1;
         }
 
         return new PartitionedTarget(finalised, newPartitionsNodes, 
partitioned.enlistmentConsistencyTokens);
     }
 
     static ExecutionTarget colocate(SomeOfTarget someOf, SomeOfTarget 
otherSomeOf) throws ColocationMappingException {
-        long newNodes = someOf.nodes & otherSomeOf.nodes;
+        BitSet newNodes = (BitSet) someOf.nodes.clone();
+        newNodes.and(otherSomeOf.nodes);
 
-        if (newNodes == 0) {
+        if (newNodes.isEmpty()) {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
         return new SomeOfTarget(newNodes);
     }
 
-    static long pickOne(long nodes) {
-        return Long.lowestOneBit(nodes);
+    static BitSet pickOne(BitSet nodes) {
+        int node = nodes.nextSetBit(0);
+
+        return node == -1 ? BitSets.of() : BitSets.of(node);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java
similarity index 93%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java
index d2bae3afcf..70763f7972 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AllOfTarget.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
 
+import java.util.BitSet;
 import java.util.List;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -28,15 +29,10 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory
  * <p>See javadoc of {@link ExecutionTargetFactory#allOf(List)} for details.
  */
 class AllOfTarget extends AbstractTarget {
-    AllOfTarget(long nodes) {
+    AllOfTarget(BitSet nodes) {
         super(nodes);
     }
 
-    @Override
-    boolean finalised() {
-        return true;
-    }
-
     @Override
     public ExecutionTarget finalise() {
         return this;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
similarity index 68%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
index e369c311d3..0cf12b78ba 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.objects.Object2LongMap;
-import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.util.BitSet;
 import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
@@ -31,24 +31,21 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory
 /**
  * A factory that able to create targets for cluster with up to 64 nodes.
  */
-public class SmallClusterFactory implements ExecutionTargetFactory {
+public class LargeClusterFactory implements ExecutionTargetFactory {
     private final List<String> nodes;
-    private final Object2LongMap<String> nodeNameToId;
+    private final Object2IntMap<String> nodeNameToId;
 
     /** Constructor. */
-    public SmallClusterFactory(List<String> nodes) {
-        if (nodes.size() > 64) {
-            throw new IllegalArgumentException("Supported up to 64 nodes, but 
was " + nodes.size());
-        }
-
-        // to make mapping stable
-        this.nodes = nodes.stream().sorted().collect(Collectors.toList());
+    public LargeClusterFactory(List<String> nodes) {
+        this.nodes = nodes;
 
-        nodeNameToId = new Object2LongOpenHashMap<>(nodes.size());
+        nodeNameToId = new Object2IntOpenHashMap<>(nodes.size());
+        nodeNameToId.defaultReturnValue(-1);
 
         int idx = 0;
-        for (String name : this.nodes) {
-            nodeNameToId.putIfAbsent(name, 1L << idx++);
+        for (String name : nodes) {
+            int ret = nodeNameToId.putIfAbsent(name, idx++);
+            assert ret == -1 : "invalid node";
         }
     }
 
@@ -69,7 +66,7 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
 
     @Override
     public ExecutionTarget partitioned(List<TokenizedAssignments> assignments) 
{
-        long[] partitionNodes = new long[assignments.size()];
+        BitSet[] partitionNodes = new BitSet[assignments.size()];
         long[] enlistmentConsistencyTokens = new long[assignments.size()];
 
         int idx = 0;
@@ -77,11 +74,15 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
         for (TokenizedAssignments assignment : assignments) {
             finalised = finalised && assignment.nodes().size() < 2;
 
+            BitSet nodes = new BitSet(assignment.nodes().size());
             for (Assignment a : assignment.nodes()) {
-                partitionNodes[idx] |= 
nodeNameToId.getOrDefault(a.consistentId(), 0);
-                enlistmentConsistencyTokens[idx] = assignment.token();
+                int node = nodeNameToId.getOrDefault(a.consistentId(), -1);
+                assert node >= 0 : "invalid node";
+                nodes.set(node);
             }
 
+            partitionNodes[idx] = nodes;
+            enlistmentConsistencyTokens[idx] = assignment.token();
             idx++;
         }
 
@@ -90,27 +91,30 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
 
     @Override
     public List<String> resolveNodes(ExecutionTarget target) {
-        target = target.finalise();
-
         assert target instanceof AbstractTarget : target == null ? "<null>" : 
target.getClass().getCanonicalName();
 
+        target = ((AbstractTarget) target).finalise();
+
         return ((AbstractTarget) target).nodes(nodes);
     }
 
     @Override
     public Int2ObjectMap<NodeWithConsistencyToken> 
resolveAssignments(ExecutionTarget target) {
-        target = target.finalise();
-
         assert target instanceof AbstractTarget : target == null ? "<null>" : 
target.getClass().getCanonicalName();
 
+        target = ((AbstractTarget) target).finalise();
+
         return ((AbstractTarget) target).assignments(nodes);
     }
 
-    private long nodeListToMap(List<String> nodes) {
-        long nodesMap = 0;
+    private BitSet nodeListToMap(List<String> nodes) {
+        BitSet nodesMap = new BitSet(nodes.size());
 
         for (String name : nodes) {
-            nodesMap |= nodeNameToId.getOrDefault(name, 0);
+            int id = nodeNameToId.getOrDefault(name, -1);
+            assert id >= 0 : "invalid node";
+
+            nodesMap.set(id);
         }
 
         return nodesMap;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java
similarity index 83%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java
index 3098c7dd97..4c9c8de635 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/OneOfTarget.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
-
-import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
 
+import java.util.BitSet;
 import java.util.List;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -30,19 +29,13 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory
  * <p>See javadoc of {@link ExecutionTargetFactory#oneOf(List)} for details.
  */
 class OneOfTarget extends AbstractTarget {
-
-    OneOfTarget(long nodes) {
+    OneOfTarget(BitSet nodes) {
         super(nodes);
     }
 
-    @Override
-    boolean finalised() {
-        return isPow2(nodes);
-    }
-
     @Override
     public ExecutionTarget finalise() {
-        if (finalised()) {
+        if (nodes.cardinality() == 1) {
             return this;
         }
 
@@ -60,15 +53,20 @@ class OneOfTarget extends AbstractTarget {
     public ExecutionTarget trimTo(ExecutionTarget other) {
         assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
 
-        long otherNodes = ((AbstractTarget) other).nodes;
+        if (nodes.cardinality() == 1) {
+            return this;
+        }
 
-        long newNodes = nodes & otherNodes;
+        BitSet otherNodes = ((AbstractTarget) other).nodes;
 
-        if (newNodes == nodes || newNodes == 0) {
-            return this;
+        if (!nodes.equals(otherNodes) && nodes.intersects(otherNodes)) {
+            BitSet newNodes = (BitSet) nodes.clone();
+            newNodes.and(otherNodes);
+
+            return new OneOfTarget(newNodes);
         }
 
-        return new OneOfTarget(newNodes);
+        return this;
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java
similarity index 75%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java
index 9a0680eb3e..60f238a167 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/PartitionedTarget.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
 
-import it.unimi.dsi.fastutil.ints.Int2LongFunction;
+import it.unimi.dsi.fastutil.ints.Int2ObjectFunction;
+import java.util.BitSet;
 import java.util.List;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -30,10 +31,10 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory
  */
 class PartitionedTarget extends AbstractTarget {
     private final boolean finalised;
-    final long[] partitionsNodes;
+    final BitSet[] partitionsNodes;
     final long[] enlistmentConsistencyTokens;
 
-    PartitionedTarget(boolean finalised, long[] partitionsNodes, long[] 
enlistmentConsistencyTokens) {
+    PartitionedTarget(boolean finalised, BitSet[] partitionsNodes, long[] 
enlistmentConsistencyTokens) {
         super(computeNodes(partitionsNodes));
 
         this.finalised = finalised;
@@ -41,18 +42,13 @@ class PartitionedTarget extends AbstractTarget {
         this.enlistmentConsistencyTokens = enlistmentConsistencyTokens;
     }
 
-    @Override
-    boolean finalised() {
-        return finalised;
-    }
-
     @Override
     public ExecutionTarget finalise() {
         if (finalised) {
             return this;
         }
 
-        long[] newPartitionsNodes = new long[partitionsNodes.length];
+        BitSet[] newPartitionsNodes = new BitSet[partitionsNodes.length];
 
         for (int partNo = 0; partNo < partitionsNodes.length; partNo++) {
             newPartitionsNodes[partNo] = pickOne(partitionsNodes[partNo]);
@@ -76,18 +72,18 @@ class PartitionedTarget extends AbstractTarget {
             return this;
         }
 
-        long[] newPartitionsNodes = new long[partitionsNodes.length];
+        BitSet[] newPartitionsNodes = new BitSet[partitionsNodes.length];
         boolean changed = false;
-        Int2LongFunction partitionNodesResolver = partitionNodeResolver(other);
+        Int2ObjectFunction<BitSet> partitionNodesResolver = 
partitionNodeResolver(other);
 
         for (int i = 0; i < partitionsNodes.length; i++) {
-            long newNodes = partitionsNodes[i] & partitionNodesResolver.get(i);
+            BitSet newNodes = partitionsNodes[i];
+            BitSet otherNodes = partitionNodesResolver.get(i);
 
-            if (newNodes == 0) {
-                newNodes = partitionsNodes[i];
-            }
+            if (!newNodes.equals(otherNodes) && 
newNodes.intersects(otherNodes)) {
+                newNodes = (BitSet) newNodes.clone();
+                newNodes.and(otherNodes);
 
-            if (newNodes != partitionsNodes[i]) {
                 changed = true;
             }
 
@@ -101,16 +97,16 @@ class PartitionedTarget extends AbstractTarget {
         return this;
     }
 
-    private Int2LongFunction partitionNodeResolver(ExecutionTarget other) {
-        Int2LongFunction partitionNodesResolver;
+    private Int2ObjectFunction<BitSet> partitionNodeResolver(ExecutionTarget 
other) {
+        Int2ObjectFunction<BitSet> partitionNodesResolver;
 
-        if (other instanceof PartitionedTarget 
+        if (other instanceof PartitionedTarget
                 && ((PartitionedTarget) other).partitionsNodes.length == 
partitionsNodes.length) {
             PartitionedTarget otherPartitioned = (PartitionedTarget) other;
 
             partitionNodesResolver = partId -> 
otherPartitioned.partitionsNodes[partId];
         } else {
-            long otherNodes = ((AbstractTarget) other).nodes;
+            BitSet otherNodes = ((AbstractTarget) other).nodes;
 
             partitionNodesResolver = partId -> otherNodes;
         }
@@ -138,10 +134,10 @@ class PartitionedTarget extends AbstractTarget {
         return colocate(this, other);
     }
 
-    private static long computeNodes(long[] partitionsNodes) {
-        long nodes = 0;
-        for (long nodesOfPartition : partitionsNodes) {
-            nodes |= nodesOfPartition;
+    private static BitSet computeNodes(BitSet[] partitionsNodes) {
+        BitSet nodes = new BitSet();
+        for (BitSet nodesOfPartition : partitionsNodes) {
+            nodes.or(nodesOfPartition);
         }
 
         return nodes;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java
similarity index 84%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java
index 8810657f31..531412681e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/SomeOfTarget.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
+package org.apache.ignite.internal.sql.engine.exec.mapping.largecluster;
 
+import java.util.BitSet;
 import java.util.List;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -28,15 +29,10 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory
  * <p>See javadoc of {@link ExecutionTargetFactory#someOf(List)} for details.
  */
 class SomeOfTarget extends AbstractTarget {
-    SomeOfTarget(long nodes) {
+    SomeOfTarget(BitSet nodes) {
         super(nodes);
     }
 
-    @Override
-    boolean finalised() {
-        return true;
-    }
-
     @Override
     public ExecutionTarget finalise() {
         return this;
@@ -53,15 +49,20 @@ class SomeOfTarget extends AbstractTarget {
     public ExecutionTarget trimTo(ExecutionTarget other) {
         assert other instanceof AbstractTarget : other == null ? "<null>" : 
other.getClass().getCanonicalName();
 
-        long otherNodes = ((AbstractTarget) other).nodes;
+        if (nodes.cardinality() == 1) {
+            return this;
+        }
+
+        BitSet otherNodes = ((AbstractTarget) other).nodes;
 
-        long newNodes = nodes & otherNodes;
+        if (!nodes.equals(otherNodes) && nodes.intersects(otherNodes)) {
+            BitSet newNodes = (BitSet) nodes.clone();
+            newNodes.and(otherNodes);
 
-        if (newNodes == nodes || newNodes == 0) {
-            return this;
+            return new SomeOfTarget(newNodes);
         }
 
-        return new SomeOfTarget(newNodes);
+        return this;
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
index 293d697774..1783adaa92 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
@@ -23,6 +23,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
@@ -38,6 +39,8 @@ abstract class AbstractTarget implements ExecutionTarget {
     final long nodes;
 
     AbstractTarget(long nodes) {
+        assert nodes != 0 : "Empty target is not allowed";
+
         this.nodes = nodes;
     }
 
@@ -85,7 +88,16 @@ abstract class AbstractTarget implements ExecutionTarget {
         return result;
     }
 
-    abstract boolean finalised();
+    /**
+     * Finalises target by choosing exactly one node for targets with multiple 
options.
+     *
+     * <p>Some targets may have several options, so we have to pick one in 
order to get
+     * correct results. Call to this methods resolves this ambiguity by 
truncating all
+     * but one option. Which exactly option will be left is implementation 
defined.
+     *
+     * @return Finalised target.
+     */
+    abstract ExecutionTarget finalise();
 
     abstract ExecutionTarget colocate(AllOfTarget other) throws 
ColocationMappingException;
 
@@ -144,11 +156,24 @@ abstract class AbstractTarget implements ExecutionTarget {
             throw new ColocationMappingException("Targets are not colocated");
         }
 
-        if (!isPow2((partitioned.nodes))) {
-            throw new ColocationMappingException("Targets are not colocated");
+        if (isPow2((partitioned.nodes))) {
+            return partitioned; // All partitions on single node.
+        }
+
+        long colocatedNodes = oneOf.nodes;
+        for (int partNo = 0; partNo < partitioned.partitionsNodes.length; 
partNo++) {
+            colocatedNodes &= partitioned.partitionsNodes[partNo];
+
+            if (colocatedNodes == 0) {
+                throw new ColocationMappingException("Targets are not 
colocated");
+            }
         }
 
-        return partitioned;
+        boolean finalised = isPow2(colocatedNodes);
+        long[] newNodes = new long[partitioned.partitionsNodes.length];
+        Arrays.fill(newNodes, colocatedNodes);
+
+        return new PartitionedTarget(finalised, newNodes, 
partitioned.enlistmentConsistencyTokens);
     }
 
     static ExecutionTarget colocate(OneOfTarget oneOf, SomeOfTarget someOf) 
throws ColocationMappingException {
@@ -175,14 +200,15 @@ abstract class AbstractTarget implements ExecutionTarget {
                 throw new ColocationMappingException("Targets are not 
colocated");
             }
 
-            if (partitioned.enlistmentConsistencyTokens[partNo] != 
otherPartitioned.enlistmentConsistencyTokens[partNo]) {
-                throw new ColocationMappingException("Partitioned targets have 
different terms");
-            }
-
             newPartitionsNodes[partNo] = newNodes;
             finalised = finalised && isPow2(newNodes);
         }
 
+
+        if (!Arrays.equals(partitioned.enlistmentConsistencyTokens, 
otherPartitioned.enlistmentConsistencyTokens)) {
+            throw new ColocationMappingException("Partitioned targets have 
different terms");
+        }
+
         return new PartitionedTarget(finalised, newPartitionsNodes, 
partitioned.enlistmentConsistencyTokens);
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
index d2bae3afcf..4b30173ca3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AllOfTarget.java
@@ -32,11 +32,6 @@ class AllOfTarget extends AbstractTarget {
         super(nodes);
     }
 
-    @Override
-    boolean finalised() {
-        return true;
-    }
-
     @Override
     public ExecutionTarget finalise() {
         return this;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
index 3098c7dd97..29df4f102c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/OneOfTarget.java
@@ -30,19 +30,13 @@ import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory
  * <p>See javadoc of {@link ExecutionTargetFactory#oneOf(List)} for details.
  */
 class OneOfTarget extends AbstractTarget {
-
     OneOfTarget(long nodes) {
         super(nodes);
     }
 
-    @Override
-    boolean finalised() {
-        return isPow2(nodes);
-    }
-
     @Override
     public ExecutionTarget finalise() {
-        if (finalised()) {
+        if (isPow2(nodes)) {
             return this;
         }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
index 9a0680eb3e..2e7fd14536 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
@@ -42,12 +42,7 @@ class PartitionedTarget extends AbstractTarget {
     }
 
     @Override
-    boolean finalised() {
-        return finalised;
-    }
-
-    @Override
-    public ExecutionTarget finalise() {
+    ExecutionTarget finalise() {
         if (finalised) {
             return this;
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index e369c311d3..732940c6d2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -78,10 +78,13 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
             finalised = finalised && assignment.nodes().size() < 2;
 
             for (Assignment a : assignment.nodes()) {
-                partitionNodes[idx] |= 
nodeNameToId.getOrDefault(a.consistentId(), 0);
-                enlistmentConsistencyTokens[idx] = assignment.token();
+                long node = nodeNameToId.getOrDefault(a.consistentId(), -1);
+                assert node >= 0 : "invalid node";
+                partitionNodes[idx] |= node;
             }
 
+            enlistmentConsistencyTokens[idx] = assignment.token();
+
             idx++;
         }
 
@@ -90,19 +93,19 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
 
     @Override
     public List<String> resolveNodes(ExecutionTarget target) {
-        target = target.finalise();
-
         assert target instanceof AbstractTarget : target == null ? "<null>" : 
target.getClass().getCanonicalName();
 
+        target = ((AbstractTarget) target).finalise();
+
         return ((AbstractTarget) target).nodes(nodes);
     }
 
     @Override
     public Int2ObjectMap<NodeWithConsistencyToken> 
resolveAssignments(ExecutionTarget target) {
-        target = target.finalise();
-
         assert target instanceof AbstractTarget : target == null ? "<null>" : 
target.getClass().getCanonicalName();
 
+        target = ((AbstractTarget) target).finalise();
+
         return ((AbstractTarget) target).assignments(nodes);
     }
 
@@ -110,7 +113,9 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
         long nodesMap = 0;
 
         for (String name : nodes) {
-            nodesMap |= nodeNameToId.getOrDefault(name, 0);
+            long node = nodeNameToId.getOrDefault(name, -1);
+            assert node >= 0 : "invalid node";
+            nodesMap |= node;
         }
 
         return nodesMap;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
index 8810657f31..3aa5a3300c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SomeOfTarget.java
@@ -32,11 +32,6 @@ class SomeOfTarget extends AbstractTarget {
         super(nodes);
     }
 
-    @Override
-    boolean finalised() {
-        return true;
-    }
-
     @Override
     public ExecutionTarget finalise() {
         return this;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/MappingBenchmark.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/MappingBenchmark.java
new file mode 100644
index 0000000000..f473849a19
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/MappingBenchmark.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.benchmarks;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
+import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Mapping part benchmark.
+ *
+ * <p>Note: seems more accurate results can be obtained after boosting 
disabling:
+ * linux: echo "1" | sudo tee /sys/devices/system/cpu/intel_pstate/no_turbo
+ */
+@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(1)
+@Threads(1)
+@State(Scope.Benchmark)
+public class MappingBenchmark {
+    private ExecutionTarget oneOfSmall;
+    private ExecutionTarget someOfSmall;
+    private ExecutionTarget part1Small;
+    private ExecutionTarget part2Small;
+    private ExecutionTarget part1SmallMore;
+    private ExecutionTarget part2SmallMore;
+    private ExecutionTarget allOfSmall;
+
+    private ExecutionTarget oneOfBig;
+    private ExecutionTarget someOfBig;
+    private ExecutionTarget part1Big;
+    private ExecutionTarget part2Big;
+    private ExecutionTarget part1BigMore;
+    private ExecutionTarget part2BigMore;
+    private ExecutionTarget allOfBig;
+
+    private ExecutionTargetFactory targetFactorySmall;
+    private ExecutionTargetFactory targetFactoryLarge;
+
+    /** Prepare the plan of the query. */
+    @Setup
+    public void setUp() {
+        List<String> nodes = List.of("n1", "n2", "n3", "n4", "n5");
+        List<String> oneOfNodes = List.of("n1", "n3");
+        List<TokenizedAssignments> partNodes =
+                List.of(tokenizedAssignment(1, "n3"));
+        List<TokenizedAssignments> partNodesMore =
+                List.of(tokenizedAssignment(1, "n3", "n4"),
+                        tokenizedAssignment(2, "n4", "n5"),
+                        tokenizedAssignment(3, "n5", "n3"));
+        List<TokenizedAssignments> partNodesMore2 =
+                List.of(tokenizedAssignment(1, "n5", "n3"),
+                        tokenizedAssignment(2, "n3", "n4"),
+                        tokenizedAssignment(3, "n4", "n5"));
+
+        targetFactorySmall = new SmallClusterFactory(nodes);
+        targetFactoryLarge = new LargeClusterFactory(nodes);
+
+        oneOfSmall = targetFactorySmall.oneOf(oneOfNodes);
+        someOfSmall = targetFactorySmall.someOf(nodes);
+        allOfSmall = targetFactorySmall.allOf(List.of("n3"));
+        part1Small = targetFactorySmall.partitioned(partNodes);
+        part2Small = targetFactorySmall.partitioned(partNodes);
+        part1SmallMore = targetFactorySmall.partitioned(partNodesMore);
+        part2SmallMore = targetFactorySmall.partitioned(partNodesMore2);
+
+        oneOfBig = targetFactoryLarge.oneOf(oneOfNodes);
+        someOfBig = targetFactoryLarge.someOf(nodes);
+        allOfBig = targetFactoryLarge.allOf(List.of("n3"));
+        part1Big = targetFactoryLarge.partitioned(partNodes);
+        part2Big = targetFactoryLarge.partitioned(partNodes);
+        part1BigMore = targetFactoryLarge.partitioned(partNodesMore);
+        part2BigMore = targetFactoryLarge.partitioned(partNodesMore2);
+    }
+
+    /** Measure small cluster mapping implementation. */
+    @Benchmark
+    public void benchSmall(Blackhole bh) throws ColocationMappingException {
+        finalise(targetFactorySmall, oneOfSmall.colocateWith(oneOfSmall), bh);
+        finalise(targetFactorySmall, oneOfSmall.colocateWith(someOfSmall), bh);
+        finalise(targetFactorySmall, oneOfSmall.colocateWith(allOfSmall), bh);
+        finalise(targetFactorySmall, oneOfSmall.colocateWith(part1Small), bh);
+
+        finalise(targetFactorySmall, someOfSmall.colocateWith(someOfSmall), 
bh);
+        finalise(targetFactorySmall, someOfSmall.colocateWith(part1Small), bh);
+        finalise(targetFactorySmall, someOfSmall.colocateWith(allOfSmall), bh);
+
+        finalise(targetFactorySmall, part1Small.colocateWith(part2Small), bh);
+    }
+
+    /** Measure small cluster mapping partitioned only implementation. */
+    @Benchmark
+    public void benchSmallPartitionedOnly(Blackhole bh) throws 
ColocationMappingException {
+        finalise(targetFactorySmall, 
part1SmallMore.colocateWith(part2SmallMore), bh);
+    }
+
+    /** Measure huge cluster mapping implementation. */
+    @Benchmark
+    public void benchBig(Blackhole bh) throws ColocationMappingException {
+        finalise(targetFactoryLarge, oneOfBig.colocateWith(oneOfBig), bh);
+        finalise(targetFactoryLarge, oneOfBig.colocateWith(someOfBig), bh);
+        finalise(targetFactoryLarge, oneOfBig.colocateWith(allOfBig), bh);
+        finalise(targetFactoryLarge, oneOfBig.colocateWith(part1Big), bh);
+
+        finalise(targetFactoryLarge, someOfBig.colocateWith(someOfBig), bh);
+        finalise(targetFactoryLarge, someOfBig.colocateWith(part1Big), bh);
+        finalise(targetFactoryLarge, someOfBig.colocateWith(allOfBig), bh);
+
+        finalise(targetFactoryLarge, part1Big.colocateWith(part2Big), bh);
+    }
+
+    /** Measure huge cluster mapping partitioned only implementation. */
+    @Benchmark
+    public void benchBigPartitionedOnly(Blackhole bh) throws 
ColocationMappingException {
+        finalise(targetFactoryLarge, part1BigMore.colocateWith(part2BigMore), 
bh);
+    }
+
+    /**
+     * Runs the benchmark.
+     *
+     * @param args args
+     * @throws Exception if something goes wrong
+     */
+    public static void main(String[] args) throws Exception {
+        Options build = new OptionsBuilder()
+                .include(MappingBenchmark.class.getName())
+                .build();
+
+        new Runner(build).run();
+    }
+
+    private static TokenizedAssignmentsImpl tokenizedAssignment(int token, 
String... peer) {
+        return new 
TokenizedAssignmentsImpl(Arrays.stream(peer).map(Assignment::forPeer).collect(Collectors.toSet()),
 token);
+    }
+
+    private static void finalise(ExecutionTargetFactory factory, 
ExecutionTarget target, Blackhole bh) {
+        bh.consume(factory.resolveNodes(target));
+    }
+}
\ No newline at end of file
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
new file mode 100644
index 0000000000..be662b48d0
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.iterableWithSize;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
+import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Test class to verify {@link ExecutionTargetFactory} implementations.
+ *
+ * @see SmallClusterFactory
+ * @see LargeClusterFactory
+ */
+public class ExecutionTargetFactorySelfTest {
+    private static final List<String> ALL_NODES = List.of("node1", "node2", 
"node3", "node4", "node5");
+    private static final List<String> NODE_SET = List.of("node2", "node4", 
"node5");
+    private static final List<String> NODE_SET2 = List.of("node2", "node3", 
"node5");
+    private static final List<String> NODE_SUBSET = List.of("node2", "node5");
+    private static final List<String> SINGLE_NODE_SET = List.of("node4");
+    private static final List<String> INVALID_NODE_SET = List.of("node0");
+
+    private static List<ExecutionTargetFactory> clusterFactory() {
+        return List.of(
+                new SmallClusterFactory(ALL_NODES),
+                new LargeClusterFactory(ALL_NODES)
+        );
+    }
+
+    @SuppressWarnings({"ResultOfObjectAllocationIgnored", 
"ThrowableNotThrown"})
+    @Test
+    void smallClusterFactory() {
+        List<String> nodes = IntStream.range(0, 65).mapToObj(i -> 
"node").collect(Collectors.toList());
+
+        assertThrows(IllegalArgumentException.class, () -> new 
SmallClusterFactory(nodes), "Supported up to 64 nodes");
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void targetsResolution(ExecutionTargetFactory f) {
+        assertThat(f.resolveNodes(f.allOf(NODE_SET)), equalTo(NODE_SET));
+        assertThat(f.resolveNodes(f.allOf(NODE_SET)), equalTo(NODE_SET));
+        assertThat(f.resolveNodes(f.someOf(NODE_SET)), hasItems(in(NODE_SET)));
+        assertThat(f.resolveNodes(f.oneOf(NODE_SET)), 
containsSingleFrom(NODE_SET));
+        
assertThat(f.resolveNodes(f.partitioned(assignmentFromPrimaries(NODE_SET))), 
equalTo(NODE_SET));
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void targetValidation(ExecutionTargetFactory f) {
+        assertThrows(AssertionError.class, () -> f.allOf(List.of()), null);
+        assertThrows(AssertionError.class, () -> f.someOf(List.of()), null);
+        assertThrows(AssertionError.class, () -> f.oneOf(List.of()), null);
+        assertThrows(AssertionError.class, () -> f.partitioned(List.of()), 
null);
+
+        assertThrows(Throwable.class, () -> f.allOf(INVALID_NODE_SET), 
"invalid node");
+        assertThrows(Throwable.class, () -> f.someOf(INVALID_NODE_SET), 
"invalid node");
+        assertThrows(Throwable.class, () -> f.oneOf(INVALID_NODE_SET), 
"invalid node");
+        assertThrows(Throwable.class, () -> 
f.partitioned(assignmentFromPrimaries(INVALID_NODE_SET)), "invalid node");
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void allOfTargets(ExecutionTargetFactory f) throws Exception {
+        // Self colocation
+        assertColocated(f, f.allOf(NODE_SET), f.allOf(NODE_SET), 
equalTo(NODE_SET));
+        assertNotColocated(f.allOf(NODE_SET), f.allOf(NODE_SUBSET));
+        assertNotColocated(f.allOf(NODE_SUBSET), f.allOf(NODE_SET));
+
+        // Colocation with SomeOf
+        assertColocated(f, f.allOf(NODE_SET), f.someOf(NODE_SET), 
equalTo(NODE_SET));
+        assertNotColocated(f.allOf(NODE_SET), f.someOf(NODE_SUBSET));
+        assertColocated(f, f.allOf(NODE_SUBSET), f.someOf(NODE_SET), 
equalTo(NODE_SUBSET));
+
+        // Colocation with OneOf
+        assertNotColocated(f.allOf(NODE_SET), f.oneOf(NODE_SET));
+        assertNotColocated(f.allOf(NODE_SET), f.oneOf(NODE_SUBSET));
+        assertNotColocated(f.allOf(NODE_SUBSET), f.oneOf(NODE_SET));
+        assertColocated(f, f.allOf(SINGLE_NODE_SET), f.oneOf(NODE_SET), 
equalTo(SINGLE_NODE_SET));
+
+        // Colocation with Partitioned
+        assertNotColocated(f.allOf(NODE_SET), 
f.partitioned(assignmentFromPrimaries(NODE_SET)),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.allOf(SINGLE_NODE_SET), 
f.partitioned(assignmentFromPrimaries(NODE_SET)),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.allOf(SINGLE_NODE_SET), 
f.partitioned(assignmentFromPrimaries(SINGLE_NODE_SET)),
+                "AllOf target and Partitioned can't be colocated");
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void someOfTargets(ExecutionTargetFactory f) throws Exception {
+        // Self colocation
+        assertColocated(f, f.someOf(NODE_SET), f.someOf(NODE_SET), 
equalTo(NODE_SET));
+        assertColocated(f, f.someOf(NODE_SET), f.someOf(NODE_SUBSET), 
equalTo(NODE_SUBSET));
+        assertColocated(f, f.someOf(NODE_SUBSET), f.someOf(NODE_SET), 
equalTo(NODE_SUBSET));
+        assertNotColocated(f.someOf(SINGLE_NODE_SET), f.someOf(NODE_SET2)); // 
Disjoint sets.
+
+        // Colocation with AllOf
+        assertColocated(f, f.someOf(NODE_SET), f.allOf(NODE_SET), 
equalTo(NODE_SET));
+        assertColocated(f, f.someOf(NODE_SET), f.allOf(NODE_SUBSET), 
equalTo(NODE_SUBSET));
+        assertNotColocated(f.someOf(NODE_SUBSET), f.allOf(NODE_SET));
+
+        // Colocation with OneOf
+        assertColocated(f, f.someOf(NODE_SET), f.oneOf(NODE_SET), 
containsSingleFrom(NODE_SET));
+        assertColocated(f, f.someOf(NODE_SUBSET), f.oneOf(NODE_SET), 
containsSingleFrom(NODE_SUBSET));
+        assertColocated(f, f.someOf(NODE_SET), f.oneOf(NODE_SUBSET), 
containsSingleFrom(NODE_SUBSET));
+        assertNotColocated(f.someOf(SINGLE_NODE_SET), f.oneOf(NODE_SET2)); // 
Disjoint sets.
+
+        // Colocation with Partitioned
+        assertColocated(f, f.someOf(NODE_SET), 
f.partitioned(assignmentFromPrimaries(NODE_SET)), equalTo(NODE_SET));
+        assertColocated(f, f.someOf(NODE_SET), 
f.partitioned(assignmentFromPrimaries(NODE_SUBSET)), equalTo(NODE_SUBSET));
+        assertNotColocated(f.someOf(NODE_SUBSET), 
f.partitioned(assignmentFromPrimaries(NODE_SET)));
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void oneOfTargets(ExecutionTargetFactory f) throws Exception {
+        // Self colocation
+        assertColocated(f, f.oneOf(NODE_SET), f.oneOf(NODE_SET), 
containsSingleFrom(NODE_SET));
+        assertColocated(f, f.oneOf(NODE_SET), f.oneOf(shuffle(NODE_SET)), 
containsSingleFrom(NODE_SET));
+        assertColocated(f, f.oneOf(NODE_SUBSET), f.oneOf(NODE_SET), 
containsSingleFrom(NODE_SUBSET));
+        assertColocated(f, f.oneOf(NODE_SET), f.oneOf(NODE_SUBSET), 
containsSingleFrom(NODE_SUBSET));
+        assertNotColocated(f.oneOf(NODE_SET2), f.oneOf(SINGLE_NODE_SET)); // 
Disjoint sets.
+
+        // Colocation with AllOf
+        assertNotColocated(f.oneOf(NODE_SET), f.allOf(NODE_SET));
+        assertColocated(f, f.oneOf(NODE_SET), f.allOf(SINGLE_NODE_SET), 
equalTo(SINGLE_NODE_SET));
+        assertNotColocated(f.oneOf(NODE_SUBSET), f.allOf(NODE_SET));
+        assertNotColocated(f.oneOf(NODE_SET2), f.allOf(SINGLE_NODE_SET)); // 
Disjoint sets.
+
+        // Colocation with someOf
+        assertColocated(f, f.oneOf(NODE_SET), f.someOf(NODE_SET), 
containsSingleFrom(NODE_SET));
+        assertColocated(f, f.oneOf(NODE_SUBSET), f.someOf(NODE_SET), 
containsSingleFrom(NODE_SUBSET));
+        assertColocated(f, f.oneOf(NODE_SET), f.someOf(NODE_SUBSET), 
containsSingleFrom(NODE_SUBSET));
+        assertNotColocated(f.oneOf(SINGLE_NODE_SET), f.someOf(NODE_SET2)); // 
Disjoint sets.
+
+        // Colocation with Partitioned
+        assertNotColocated(f.oneOf(NODE_SET), 
f.partitioned(assignmentFromPrimaries(NODE_SET)));
+        assertColocated(f, f.oneOf(NODE_SET), 
f.partitioned(assignmentFromPrimaries(SINGLE_NODE_SET)), 
equalTo(SINGLE_NODE_SET));
+        assertNotColocated(f.oneOf(NODE_SET2), 
f.partitioned(assignmentFromPrimaries(SINGLE_NODE_SET))); // Disjoint sets.
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void partitionedPrimaryTargets(ExecutionTargetFactory f) throws Exception {
+        // Self colocation
+        assertColocated(f, f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.partitioned(assignmentFromPrimaries(NODE_SET)),
+                equalTo(NODE_SET));
+        assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.partitioned(shuffle(assignmentFromPrimaries(NODE_SET))));
+        
assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SUBSET)), 
f.partitioned(assignmentFromPrimaries(NODE_SET)),
+                "Partitioned targets with mot matching numbers of partitioned 
are not colocated");
+        assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.partitioned(assignmentFromPrimaries(NODE_SUBSET)),
+                "Partitioned targets with mot matching numbers of partitioned 
are not colocated");
+
+        assertNotColocated(f.partitioned(singleWithToken("node1", 1)), 
f.partitioned(singleWithToken("node1", 2)),
+                "Partitioned targets have different terms");
+
+        // Colocation with AllOf
+        assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.allOf(NODE_SET),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.allOf(SINGLE_NODE_SET),
+                "AllOf target and Partitioned can't be colocated");
+
+        // Colocation with someOf
+        assertColocated(f, f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.someOf(NODE_SET), equalTo(NODE_SET));
+        assertColocated(f, 
f.partitioned(assignmentFromPrimaries(NODE_SUBSET)), f.someOf(NODE_SET), 
equalTo(NODE_SUBSET));
+        assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.someOf(NODE_SUBSET));
+        
assertNotColocated(f.partitioned(assignmentFromPrimaries(SINGLE_NODE_SET)), 
f.someOf(NODE_SET2)); // Disjoint sets.
+
+        // Colocation with oneOf
+        assertNotColocated(f.partitioned(assignmentFromPrimaries(NODE_SET)), 
f.oneOf(NODE_SET));
+        assertColocated(f, 
f.partitioned(assignmentFromPrimaries(SINGLE_NODE_SET)), f.oneOf(NODE_SET), 
equalTo(SINGLE_NODE_SET));
+        
assertNotColocated(f.partitioned(assignmentFromPrimaries(SINGLE_NODE_SET)), 
f.oneOf(NODE_SET2)); // Disjoint
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void partitionedWithBackupsTargets(ExecutionTargetFactory f) throws 
Exception {
+        // Self colocation
+        assertColocated(f, f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.partitioned(assignment(NODE_SET, NODE_SET2)),
+                hasItems(in(NODE_SUBSET)));
+        assertColocated(f, f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.partitioned(assignment(NODE_SET2, NODE_SET)),
+                hasItems(in(NODE_SUBSET)));
+
+        // Colocation with AllOf
+        assertNotColocated(f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.allOf(NODE_SUBSET),
+                "AllOf target and Partitioned can't be colocated");
+        assertNotColocated(f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.allOf(SINGLE_NODE_SET),
+                "AllOf target and Partitioned can't be colocated");
+
+        // Colocation with someOf
+        assertColocated(f, f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.someOf(NODE_SET), hasItems((in(NODE_SET))));
+        assertNotColocated(f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.someOf(SINGLE_NODE_SET)); // Disjoint sets.
+
+        // Colocation with oneOf
+        assertColocated(f, f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.someOf(NODE_SET), hasItems((in(NODE_SUBSET))));
+        assertColocated(f, f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.oneOf(NODE_SET), containsSingleFrom(NODE_SUBSET));
+        assertNotColocated(f.partitioned(assignment(NODE_SET, NODE_SET2)), 
f.oneOf(SINGLE_NODE_SET)); // Disjoint
+    }
+
+    private static List<TokenizedAssignments> 
assignmentFromPrimaries(List<String> nodes) {
+        return nodes.stream()
+                .map(n -> new 
TokenizedAssignmentsImpl(Set.of(Assignment.forPeer(n)), 1))
+                .collect(Collectors.toList());
+    }
+
+    private static List<TokenizedAssignments> assignment(List<String> 
part1Nodes, List<String> part2Nodes) {
+        Set<Assignment> part1 = 
part1Nodes.stream().map(Assignment::forPeer).collect(Collectors.toSet());
+        Set<Assignment> part2 = 
part2Nodes.stream().map(Assignment::forPeer).collect(Collectors.toSet());
+
+        return List.of(
+                new TokenizedAssignmentsImpl(part1, 1),
+                new TokenizedAssignmentsImpl(part2, 2)
+        );
+    }
+
+    private static List<TokenizedAssignments> singleWithToken(String name, int 
token) {
+        return List.of(new 
TokenizedAssignmentsImpl(Set.of(Assignment.forPeer(name)), token));
+    }
+
+    private static <T> ArrayList<T> shuffle(List<T> nodeSetWithTokens) {
+        ArrayList<T> shuffled = new ArrayList<>(nodeSetWithTokens);
+        Collections.reverse(shuffled);
+        return shuffled;
+    }
+
+    private static Matcher<Iterable<String>> containsSingleFrom(List<String> 
items) {
+        return allOf(iterableWithSize(1), hasItems(in(items)));
+    }
+
+    private static void assertColocated(
+            ExecutionTargetFactory factory,
+            ExecutionTarget target,
+            ExecutionTarget other,
+            Matcher<Iterable<String>> matcher
+    ) throws ColocationMappingException {
+        assertThat(factory.resolveNodes(target.colocateWith(other)), matcher);
+    }
+
+    private static void assertNotColocated(ExecutionTarget target, 
ExecutionTarget other) {
+        assertNotColocated(target, other, "Targets are not colocated");
+    }
+
+    @SuppressWarnings("ThrowableNotThrown")
+    private static void assertNotColocated(ExecutionTarget target, 
ExecutionTarget other, String errorMessageFragment) {
+        assertThrows(ColocationMappingException.class, () -> 
target.colocateWith(other), errorMessageFragment);
+    }
+}

Reply via email to