This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 24c541dfe9 IGNITE-22861 Sql. Make mapping parameters respect mapOnBackups flag (#4195) 24c541dfe9 is described below commit 24c541dfe9242a053ac6f681d486690764867f07 Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com> AuthorDate: Mon Aug 19 18:00:32 2024 +0300 IGNITE-22861 Sql. Make mapping parameters respect mapOnBackups flag (#4195) --- .../sql/engine/exec/mapping/MappingParameters.java | 5 ++- .../exec/mapping/largecluster/AbstractTarget.java | 2 +- .../mapping/largecluster/LargeClusterFactory.java | 36 ++++++++++++++++------ .../mapping/smallcluster/SmallClusterFactory.java | 34 +++++++++++++++----- .../mapping/ExecutionTargetFactorySelfTest.java | 35 +++++++++++++-------- .../exec/mapping/MappingServiceImplTest.java | 24 +++++++++++++++ 6 files changed, 105 insertions(+), 31 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java index 1f586c4541..1e94ff3b89 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java @@ -25,6 +25,9 @@ public class MappingParameters { /** Empty mapping parameters. */ public static final MappingParameters EMPTY = new MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false); + /** Allow map on backups. */ + public static final MappingParameters MAP_ON_BACKUPS = new MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, true); + private final boolean mapOnBackups; private final Object[] dynamicParameters; @@ -38,7 +41,7 @@ public class MappingParameters { */ public static MappingParameters create(Object[] dynamicParameters, boolean mapOnBackups) { if (dynamicParameters.length == 0) { - return EMPTY; + return mapOnBackups ? MAP_ON_BACKUPS : EMPTY; } else { return new MappingParameters(dynamicParameters, mapOnBackups); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java index 6d8c14c32e..dbbb862e37 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java @@ -39,7 +39,7 @@ abstract class AbstractTarget implements ExecutionTarget { final BitSet nodes; AbstractTarget(BitSet nodes) { - assert !nodes.isEmpty(); + assert !nodes.isEmpty() : "Empty target is not allowed"; this.nodes = nodes; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java index 0cf12b78ba..ab81bece8e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java @@ -51,7 +51,16 @@ public class LargeClusterFactory implements ExecutionTargetFactory { @Override public ExecutionTarget allOf(List<String> nodes) { - return new AllOfTarget(nodeListToMap(nodes)); + BitSet nodesSet = new BitSet(nodeNameToId.size()); + + for (String name : nodes) { + int id = nodeNameToId.getOrDefault(name, -1); + assert id >= 0 : "invalid node"; + + nodesSet.set(id); + } + + return new AllOfTarget(nodesSet); } @Override @@ -72,15 +81,20 @@ public class LargeClusterFactory implements ExecutionTargetFactory { int idx = 0; boolean finalised = true; for (TokenizedAssignments assignment : assignments) { - finalised = finalised && assignment.nodes().size() < 2; - - BitSet nodes = new BitSet(assignment.nodes().size()); + BitSet nodes = new BitSet(nodeNameToId.size()); for (Assignment a : assignment.nodes()) { int node = nodeNameToId.getOrDefault(a.consistentId(), -1); - assert node >= 0 : "invalid node"; - nodes.set(node); + + // TODO Ignore unknown node until IGNITE-22969 + if (node != -1) { + nodes.set(node); + } } + assert !nodes.isEmpty() : "No partition node found"; + + finalised = finalised && nodes.cardinality() < 2; + partitionNodes[idx] = nodes; enlistmentConsistencyTokens[idx] = assignment.token(); idx++; @@ -108,15 +122,17 @@ public class LargeClusterFactory implements ExecutionTargetFactory { } private BitSet nodeListToMap(List<String> nodes) { - BitSet nodesMap = new BitSet(nodes.size()); + BitSet nodesSet = new BitSet(nodeNameToId.size()); for (String name : nodes) { int id = nodeNameToId.getOrDefault(name, -1); - assert id >= 0 : "invalid node"; - nodesMap.set(id); + // TODO Ignore unknown node until IGNITE-22969 + if (id != -1) { + nodesSet.set(id); + } } - return nodesMap; + return nodesSet; } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java index 732940c6d2..e569438e40 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster; +import static org.apache.ignite.internal.util.IgniteUtils.isPow2; + import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; @@ -54,7 +56,15 @@ public class SmallClusterFactory implements ExecutionTargetFactory { @Override public ExecutionTarget allOf(List<String> nodes) { - return new AllOfTarget(nodeListToMap(nodes)); + long nodesMap = 0; + + for (String name : nodes) { + long node = nodeNameToId.getOrDefault(name, -1); + assert node >= 0 : "invalid node"; + nodesMap |= node; + } + + return new AllOfTarget(nodesMap); } @Override @@ -75,14 +85,21 @@ public class SmallClusterFactory implements ExecutionTargetFactory { int idx = 0; boolean finalised = true; for (TokenizedAssignments assignment : assignments) { - finalised = finalised && assignment.nodes().size() < 2; - + long currentPartitionNodes = 0L; for (Assignment a : assignment.nodes()) { long node = nodeNameToId.getOrDefault(a.consistentId(), -1); - assert node >= 0 : "invalid node"; - partitionNodes[idx] |= node; + + // TODO Ignore unknown node until IGNITE-22969 + if (node != -1) { + currentPartitionNodes |= node; + } } + assert currentPartitionNodes != 0L : "No partition node found"; + + finalised = finalised && isPow2(currentPartitionNodes); + + partitionNodes[idx] = currentPartitionNodes; enlistmentConsistencyTokens[idx] = assignment.token(); idx++; @@ -114,8 +131,11 @@ public class SmallClusterFactory implements ExecutionTargetFactory { for (String name : nodes) { long node = nodeNameToId.getOrDefault(name, -1); - assert node >= 0 : "invalid node"; - nodesMap |= node; + + // TODO Ignore unknown node until IGNITE-22969 + if (node != -1) { + nodesMap |= node; + } } return nodesMap; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java index be662b48d0..2846d0f420 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.affinity.TokenizedAssignments; import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl; import org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory; import org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory; +import org.apache.ignite.internal.util.CollectionUtils; import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -53,7 +54,6 @@ public class ExecutionTargetFactorySelfTest { private static final List<String> NODE_SET2 = List.of("node2", "node3", "node5"); private static final List<String> NODE_SUBSET = List.of("node2", "node5"); private static final List<String> SINGLE_NODE_SET = List.of("node4"); - private static final List<String> INVALID_NODE_SET = List.of("node0"); private static List<ExecutionTargetFactory> clusterFactory() { return List.of( @@ -73,7 +73,6 @@ public class ExecutionTargetFactorySelfTest { @ParameterizedTest @MethodSource("clusterFactory") void targetsResolution(ExecutionTargetFactory f) { - assertThat(f.resolveNodes(f.allOf(NODE_SET)), equalTo(NODE_SET)); assertThat(f.resolveNodes(f.allOf(NODE_SET)), equalTo(NODE_SET)); assertThat(f.resolveNodes(f.someOf(NODE_SET)), hasItems(in(NODE_SET))); assertThat(f.resolveNodes(f.oneOf(NODE_SET)), containsSingleFrom(NODE_SET)); @@ -82,16 +81,28 @@ public class ExecutionTargetFactorySelfTest { @ParameterizedTest @MethodSource("clusterFactory") - void targetValidation(ExecutionTargetFactory f) { - assertThrows(AssertionError.class, () -> f.allOf(List.of()), null); - assertThrows(AssertionError.class, () -> f.someOf(List.of()), null); - assertThrows(AssertionError.class, () -> f.oneOf(List.of()), null); - assertThrows(AssertionError.class, () -> f.partitioned(List.of()), null); - - assertThrows(Throwable.class, () -> f.allOf(INVALID_NODE_SET), "invalid node"); - assertThrows(Throwable.class, () -> f.someOf(INVALID_NODE_SET), "invalid node"); - assertThrows(Throwable.class, () -> f.oneOf(INVALID_NODE_SET), "invalid node"); - assertThrows(Throwable.class, () -> f.partitioned(assignmentFromPrimaries(INVALID_NODE_SET)), "invalid node"); + void emptyTargets(ExecutionTargetFactory f) { + assertThrows(AssertionError.class, () -> f.allOf(List.of()), "Empty target is not allowed"); + assertThrows(AssertionError.class, () -> f.someOf(List.of()), "Empty target is not allowed"); + assertThrows(AssertionError.class, () -> f.oneOf(List.of()), "Empty target is not allowed"); + assertThrows(AssertionError.class, () -> f.partitioned(List.of()), "Empty target is not allowed"); + } + + @ParameterizedTest + @MethodSource("clusterFactory") + void invalidTargets(ExecutionTargetFactory f) { + List<String> invalidNodeSet = List.of("node100"); + List<String> partiallyInvalidNodeSet = CollectionUtils.concat(SINGLE_NODE_SET, invalidNodeSet); + + assertThrows(AssertionError.class, () -> f.allOf(invalidNodeSet), "invalid node"); + assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet), "Empty target is not allowed"); + assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet), "Empty target is not allowed"); + assertThrows(AssertionError.class, () -> f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "No partition node found"); + + assertThrows(Throwable.class, () -> f.allOf(partiallyInvalidNodeSet), "invalid node"); + assertThat(f.resolveNodes(f.someOf(partiallyInvalidNodeSet)), equalTo(SINGLE_NODE_SET)); + assertThat(f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)), equalTo(SINGLE_NODE_SET)); + assertThat(f.resolveNodes(f.partitioned(assignment(partiallyInvalidNodeSet, partiallyInvalidNodeSet))), equalTo(SINGLE_NODE_SET)); } @ParameterizedTest diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java index 727eb7b023..f12044316f 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java @@ -116,6 +116,30 @@ public class MappingServiceImplTest extends BaseIgniteAbstractTest { } } + @Test + public void cacheOnStableTopology() { + String localNodeName = "NODE"; + List<String> nodeNames = List.of(localNodeName, "NODE1"); + + // Initialize mapping service. + ExecutionTargetProvider targetProvider = Mockito.spy(createTargetProvider(nodeNames)); + MappingServiceImpl mappingService = new MappingServiceImpl( + localNodeName, CLOCK_SERVICE, targetProvider, CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run + ); + mappingService.onNodeJoined(Mockito.mock(LogicalNode.class), + new LogicalTopologySnapshot(1, logicalNodes(nodeNames.toArray(new String[0])))); + + + List<MappedFragment> defaultMapping = await(mappingService.map(PLAN, PARAMS)); + List<MappedFragment> mappingOnBackups = await(mappingService.map(PLAN, MappingParameters.MAP_ON_BACKUPS)); + + verify(targetProvider, times(2)).forTable(any(), any(), any(), anyBoolean()); + + assertSame(defaultMapping, await(mappingService.map(PLAN, PARAMS))); + assertSame(mappingOnBackups, await(mappingService.map(PLAN, MappingParameters.MAP_ON_BACKUPS))); + assertNotSame(defaultMapping, mappingOnBackups); + } + @Test public void serviceInitializationTest() { String localNodeName = "NODE0";