This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 24c541dfe9 IGNITE-22861 Sql. Make mapping parameters respect 
mapOnBackups flag (#4195)
24c541dfe9 is described below

commit 24c541dfe9242a053ac6f681d486690764867f07
Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com>
AuthorDate: Mon Aug 19 18:00:32 2024 +0300

    IGNITE-22861 Sql. Make mapping parameters respect mapOnBackups flag (#4195)
---
 .../sql/engine/exec/mapping/MappingParameters.java |  5 ++-
 .../exec/mapping/largecluster/AbstractTarget.java  |  2 +-
 .../mapping/largecluster/LargeClusterFactory.java  | 36 ++++++++++++++++------
 .../mapping/smallcluster/SmallClusterFactory.java  | 34 +++++++++++++++-----
 .../mapping/ExecutionTargetFactorySelfTest.java    | 35 +++++++++++++--------
 .../exec/mapping/MappingServiceImplTest.java       | 24 +++++++++++++++
 6 files changed, 105 insertions(+), 31 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
index 1f586c4541..1e94ff3b89 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
@@ -25,6 +25,9 @@ public class MappingParameters {
     /** Empty mapping parameters. */
     public static final MappingParameters EMPTY = new 
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false);
 
+    /** Allow map on backups. */
+    public static final MappingParameters MAP_ON_BACKUPS = new 
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, true);
+
     private final boolean mapOnBackups;
     private final Object[] dynamicParameters;
 
@@ -38,7 +41,7 @@ public class MappingParameters {
      */
     public static MappingParameters create(Object[] dynamicParameters, boolean 
mapOnBackups) {
         if (dynamicParameters.length == 0) {
-            return EMPTY;
+            return mapOnBackups ? MAP_ON_BACKUPS : EMPTY;
         } else {
             return new MappingParameters(dynamicParameters, mapOnBackups);
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
index 6d8c14c32e..dbbb862e37 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
@@ -39,7 +39,7 @@ abstract class AbstractTarget implements ExecutionTarget {
     final BitSet nodes;
 
     AbstractTarget(BitSet nodes) {
-        assert !nodes.isEmpty();
+        assert !nodes.isEmpty() : "Empty target is not allowed";
 
         this.nodes = nodes;
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
index 0cf12b78ba..ab81bece8e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
@@ -51,7 +51,16 @@ public class LargeClusterFactory implements 
ExecutionTargetFactory {
 
     @Override
     public ExecutionTarget allOf(List<String> nodes) {
-        return new AllOfTarget(nodeListToMap(nodes));
+        BitSet nodesSet = new BitSet(nodeNameToId.size());
+
+        for (String name : nodes) {
+            int id = nodeNameToId.getOrDefault(name, -1);
+            assert id >= 0 : "invalid node";
+
+            nodesSet.set(id);
+        }
+
+        return new AllOfTarget(nodesSet);
     }
 
     @Override
@@ -72,15 +81,20 @@ public class LargeClusterFactory implements 
ExecutionTargetFactory {
         int idx = 0;
         boolean finalised = true;
         for (TokenizedAssignments assignment : assignments) {
-            finalised = finalised && assignment.nodes().size() < 2;
-
-            BitSet nodes = new BitSet(assignment.nodes().size());
+            BitSet nodes = new BitSet(nodeNameToId.size());
             for (Assignment a : assignment.nodes()) {
                 int node = nodeNameToId.getOrDefault(a.consistentId(), -1);
-                assert node >= 0 : "invalid node";
-                nodes.set(node);
+
+                // TODO Ignore unknown node until IGNITE-22969
+                if (node != -1) {
+                    nodes.set(node);
+                }
             }
 
+            assert !nodes.isEmpty() : "No partition node found";
+
+            finalised = finalised && nodes.cardinality() < 2;
+
             partitionNodes[idx] = nodes;
             enlistmentConsistencyTokens[idx] = assignment.token();
             idx++;
@@ -108,15 +122,17 @@ public class LargeClusterFactory implements 
ExecutionTargetFactory {
     }
 
     private BitSet nodeListToMap(List<String> nodes) {
-        BitSet nodesMap = new BitSet(nodes.size());
+        BitSet nodesSet = new BitSet(nodeNameToId.size());
 
         for (String name : nodes) {
             int id = nodeNameToId.getOrDefault(name, -1);
-            assert id >= 0 : "invalid node";
 
-            nodesMap.set(id);
+            // TODO Ignore unknown node until IGNITE-22969
+            if (id != -1) {
+                nodesSet.set(id);
+            }
         }
 
-        return nodesMap;
+        return nodesSet;
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index 732940c6d2..e569438e40 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
 
+import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
@@ -54,7 +56,15 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
 
     @Override
     public ExecutionTarget allOf(List<String> nodes) {
-        return new AllOfTarget(nodeListToMap(nodes));
+        long nodesMap = 0;
+
+        for (String name : nodes) {
+            long node = nodeNameToId.getOrDefault(name, -1);
+            assert node >= 0 : "invalid node";
+            nodesMap |= node;
+        }
+
+        return new AllOfTarget(nodesMap);
     }
 
     @Override
@@ -75,14 +85,21 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
         int idx = 0;
         boolean finalised = true;
         for (TokenizedAssignments assignment : assignments) {
-            finalised = finalised && assignment.nodes().size() < 2;
-
+            long currentPartitionNodes = 0L;
             for (Assignment a : assignment.nodes()) {
                 long node = nodeNameToId.getOrDefault(a.consistentId(), -1);
-                assert node >= 0 : "invalid node";
-                partitionNodes[idx] |= node;
+
+                // TODO Ignore unknown node until IGNITE-22969
+                if (node != -1) {
+                    currentPartitionNodes |= node;
+                }
             }
 
+            assert currentPartitionNodes != 0L : "No partition node found";
+
+            finalised = finalised && isPow2(currentPartitionNodes);
+
+            partitionNodes[idx] = currentPartitionNodes;
             enlistmentConsistencyTokens[idx] = assignment.token();
 
             idx++;
@@ -114,8 +131,11 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
 
         for (String name : nodes) {
             long node = nodeNameToId.getOrDefault(name, -1);
-            assert node >= 0 : "invalid node";
-            nodesMap |= node;
+
+            // TODO Ignore unknown node until IGNITE-22969
+            if (node != -1) {
+                nodesMap |= node;
+            }
         }
 
         return nodesMap;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
index be662b48d0..2846d0f420 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -53,7 +54,6 @@ public class ExecutionTargetFactorySelfTest {
     private static final List<String> NODE_SET2 = List.of("node2", "node3", 
"node5");
     private static final List<String> NODE_SUBSET = List.of("node2", "node5");
     private static final List<String> SINGLE_NODE_SET = List.of("node4");
-    private static final List<String> INVALID_NODE_SET = List.of("node0");
 
     private static List<ExecutionTargetFactory> clusterFactory() {
         return List.of(
@@ -73,7 +73,6 @@ public class ExecutionTargetFactorySelfTest {
     @ParameterizedTest
     @MethodSource("clusterFactory")
     void targetsResolution(ExecutionTargetFactory f) {
-        assertThat(f.resolveNodes(f.allOf(NODE_SET)), equalTo(NODE_SET));
         assertThat(f.resolveNodes(f.allOf(NODE_SET)), equalTo(NODE_SET));
         assertThat(f.resolveNodes(f.someOf(NODE_SET)), hasItems(in(NODE_SET)));
         assertThat(f.resolveNodes(f.oneOf(NODE_SET)), 
containsSingleFrom(NODE_SET));
@@ -82,16 +81,28 @@ public class ExecutionTargetFactorySelfTest {
 
     @ParameterizedTest
     @MethodSource("clusterFactory")
-    void targetValidation(ExecutionTargetFactory f) {
-        assertThrows(AssertionError.class, () -> f.allOf(List.of()), null);
-        assertThrows(AssertionError.class, () -> f.someOf(List.of()), null);
-        assertThrows(AssertionError.class, () -> f.oneOf(List.of()), null);
-        assertThrows(AssertionError.class, () -> f.partitioned(List.of()), 
null);
-
-        assertThrows(Throwable.class, () -> f.allOf(INVALID_NODE_SET), 
"invalid node");
-        assertThrows(Throwable.class, () -> f.someOf(INVALID_NODE_SET), 
"invalid node");
-        assertThrows(Throwable.class, () -> f.oneOf(INVALID_NODE_SET), 
"invalid node");
-        assertThrows(Throwable.class, () -> 
f.partitioned(assignmentFromPrimaries(INVALID_NODE_SET)), "invalid node");
+    void emptyTargets(ExecutionTargetFactory f) {
+        assertThrows(AssertionError.class, () -> f.allOf(List.of()), "Empty 
target is not allowed");
+        assertThrows(AssertionError.class, () -> f.someOf(List.of()), "Empty 
target is not allowed");
+        assertThrows(AssertionError.class, () -> f.oneOf(List.of()), "Empty 
target is not allowed");
+        assertThrows(AssertionError.class, () -> f.partitioned(List.of()), 
"Empty target is not allowed");
+    }
+
+    @ParameterizedTest
+    @MethodSource("clusterFactory")
+    void invalidTargets(ExecutionTargetFactory f) {
+        List<String> invalidNodeSet = List.of("node100");
+        List<String> partiallyInvalidNodeSet = 
CollectionUtils.concat(SINGLE_NODE_SET, invalidNodeSet);
+
+        assertThrows(AssertionError.class, () -> f.allOf(invalidNodeSet), 
"invalid node");
+        assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet), 
"Empty target is not allowed");
+        assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet), 
"Empty target is not allowed");
+        assertThrows(AssertionError.class, () -> 
f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "No partition node 
found");
+
+        assertThrows(Throwable.class, () -> f.allOf(partiallyInvalidNodeSet), 
"invalid node");
+        assertThat(f.resolveNodes(f.someOf(partiallyInvalidNodeSet)), 
equalTo(SINGLE_NODE_SET));
+        assertThat(f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)), 
equalTo(SINGLE_NODE_SET));
+        
assertThat(f.resolveNodes(f.partitioned(assignment(partiallyInvalidNodeSet, 
partiallyInvalidNodeSet))), equalTo(SINGLE_NODE_SET));
     }
 
     @ParameterizedTest
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index 727eb7b023..f12044316f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -116,6 +116,30 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    @Test
+    public void cacheOnStableTopology() {
+        String localNodeName = "NODE";
+        List<String> nodeNames = List.of(localNodeName, "NODE1");
+
+        // Initialize mapping service.
+        ExecutionTargetProvider targetProvider = 
Mockito.spy(createTargetProvider(nodeNames));
+        MappingServiceImpl mappingService = new MappingServiceImpl(
+                localNodeName, CLOCK_SERVICE, targetProvider, 
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
+        );
+        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+                new LogicalTopologySnapshot(1, 
logicalNodes(nodeNames.toArray(new String[0]))));
+
+
+        List<MappedFragment> defaultMapping = await(mappingService.map(PLAN, 
PARAMS));
+        List<MappedFragment> mappingOnBackups = await(mappingService.map(PLAN, 
MappingParameters.MAP_ON_BACKUPS));
+
+        verify(targetProvider, times(2)).forTable(any(), any(), any(), 
anyBoolean());
+
+        assertSame(defaultMapping, await(mappingService.map(PLAN, PARAMS)));
+        assertSame(mappingOnBackups, await(mappingService.map(PLAN, 
MappingParameters.MAP_ON_BACKUPS)));
+        assertNotSame(defaultMapping, mappingOnBackups);
+    }
+
     @Test
     public void serviceInitializationTest() {
         String localNodeName = "NODE0";

Reply via email to