This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 24c541dfe9 IGNITE-22861 Sql. Make mapping parameters respect
mapOnBackups flag (#4195)
24c541dfe9 is described below
commit 24c541dfe9242a053ac6f681d486690764867f07
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Aug 19 18:00:32 2024 +0300
IGNITE-22861 Sql. Make mapping parameters respect mapOnBackups flag (#4195)
---
.../sql/engine/exec/mapping/MappingParameters.java | 5 ++-
.../exec/mapping/largecluster/AbstractTarget.java | 2 +-
.../mapping/largecluster/LargeClusterFactory.java | 36 ++++++++++++++++------
.../mapping/smallcluster/SmallClusterFactory.java | 34 +++++++++++++++-----
.../mapping/ExecutionTargetFactorySelfTest.java | 35 +++++++++++++--------
.../exec/mapping/MappingServiceImplTest.java | 24 +++++++++++++++
6 files changed, 105 insertions(+), 31 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
index 1f586c4541..1e94ff3b89 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingParameters.java
@@ -25,6 +25,9 @@ public class MappingParameters {
/** Empty mapping parameters. */
public static final MappingParameters EMPTY = new
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, false);
+ /** Allow map on backups. */
+ public static final MappingParameters MAP_ON_BACKUPS = new
MappingParameters(ArrayUtils.OBJECT_EMPTY_ARRAY, true);
+
private final boolean mapOnBackups;
private final Object[] dynamicParameters;
@@ -38,7 +41,7 @@ public class MappingParameters {
*/
public static MappingParameters create(Object[] dynamicParameters, boolean
mapOnBackups) {
if (dynamicParameters.length == 0) {
- return EMPTY;
+ return mapOnBackups ? MAP_ON_BACKUPS : EMPTY;
} else {
return new MappingParameters(dynamicParameters, mapOnBackups);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
index 6d8c14c32e..dbbb862e37 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/AbstractTarget.java
@@ -39,7 +39,7 @@ abstract class AbstractTarget implements ExecutionTarget {
final BitSet nodes;
AbstractTarget(BitSet nodes) {
- assert !nodes.isEmpty();
+ assert !nodes.isEmpty() : "Empty target is not allowed";
this.nodes = nodes;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
index 0cf12b78ba..ab81bece8e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
@@ -51,7 +51,16 @@ public class LargeClusterFactory implements
ExecutionTargetFactory {
@Override
public ExecutionTarget allOf(List<String> nodes) {
- return new AllOfTarget(nodeListToMap(nodes));
+ BitSet nodesSet = new BitSet(nodeNameToId.size());
+
+ for (String name : nodes) {
+ int id = nodeNameToId.getOrDefault(name, -1);
+ assert id >= 0 : "invalid node";
+
+ nodesSet.set(id);
+ }
+
+ return new AllOfTarget(nodesSet);
}
@Override
@@ -72,15 +81,20 @@ public class LargeClusterFactory implements
ExecutionTargetFactory {
int idx = 0;
boolean finalised = true;
for (TokenizedAssignments assignment : assignments) {
- finalised = finalised && assignment.nodes().size() < 2;
-
- BitSet nodes = new BitSet(assignment.nodes().size());
+ BitSet nodes = new BitSet(nodeNameToId.size());
for (Assignment a : assignment.nodes()) {
int node = nodeNameToId.getOrDefault(a.consistentId(), -1);
- assert node >= 0 : "invalid node";
- nodes.set(node);
+
+ // TODO Ignore unknown node until IGNITE-22969
+ if (node != -1) {
+ nodes.set(node);
+ }
}
+ assert !nodes.isEmpty() : "No partition node found";
+
+ finalised = finalised && nodes.cardinality() < 2;
+
partitionNodes[idx] = nodes;
enlistmentConsistencyTokens[idx] = assignment.token();
idx++;
@@ -108,15 +122,17 @@ public class LargeClusterFactory implements
ExecutionTargetFactory {
}
private BitSet nodeListToMap(List<String> nodes) {
- BitSet nodesMap = new BitSet(nodes.size());
+ BitSet nodesSet = new BitSet(nodeNameToId.size());
for (String name : nodes) {
int id = nodeNameToId.getOrDefault(name, -1);
- assert id >= 0 : "invalid node";
- nodesMap.set(id);
+ // TODO Ignore unknown node until IGNITE-22969
+ if (id != -1) {
+ nodesSet.set(id);
+ }
}
- return nodesMap;
+ return nodesSet;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index 732940c6d2..e569438e40 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
+import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
+
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
@@ -54,7 +56,15 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
@Override
public ExecutionTarget allOf(List<String> nodes) {
- return new AllOfTarget(nodeListToMap(nodes));
+ long nodesMap = 0;
+
+ for (String name : nodes) {
+ long node = nodeNameToId.getOrDefault(name, -1);
+ assert node >= 0 : "invalid node";
+ nodesMap |= node;
+ }
+
+ return new AllOfTarget(nodesMap);
}
@Override
@@ -75,14 +85,21 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
int idx = 0;
boolean finalised = true;
for (TokenizedAssignments assignment : assignments) {
- finalised = finalised && assignment.nodes().size() < 2;
-
+ long currentPartitionNodes = 0L;
for (Assignment a : assignment.nodes()) {
long node = nodeNameToId.getOrDefault(a.consistentId(), -1);
- assert node >= 0 : "invalid node";
- partitionNodes[idx] |= node;
+
+ // TODO Ignore unknown node until IGNITE-22969
+ if (node != -1) {
+ currentPartitionNodes |= node;
+ }
}
+ assert currentPartitionNodes != 0L : "No partition node found";
+
+ finalised = finalised && isPow2(currentPartitionNodes);
+
+ partitionNodes[idx] = currentPartitionNodes;
enlistmentConsistencyTokens[idx] = assignment.token();
idx++;
@@ -114,8 +131,11 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
for (String name : nodes) {
long node = nodeNameToId.getOrDefault(name, -1);
- assert node >= 0 : "invalid node";
- nodesMap |= node;
+
+ // TODO Ignore unknown node until IGNITE-22969
+ if (node != -1) {
+ nodesMap |= node;
+ }
}
return nodesMap;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
index be662b48d0..2846d0f420 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -36,6 +36,7 @@ import
org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
import
org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory;
import
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -53,7 +54,6 @@ public class ExecutionTargetFactorySelfTest {
private static final List<String> NODE_SET2 = List.of("node2", "node3",
"node5");
private static final List<String> NODE_SUBSET = List.of("node2", "node5");
private static final List<String> SINGLE_NODE_SET = List.of("node4");
- private static final List<String> INVALID_NODE_SET = List.of("node0");
private static List<ExecutionTargetFactory> clusterFactory() {
return List.of(
@@ -73,7 +73,6 @@ public class ExecutionTargetFactorySelfTest {
@ParameterizedTest
@MethodSource("clusterFactory")
void targetsResolution(ExecutionTargetFactory f) {
- assertThat(f.resolveNodes(f.allOf(NODE_SET)), equalTo(NODE_SET));
assertThat(f.resolveNodes(f.allOf(NODE_SET)), equalTo(NODE_SET));
assertThat(f.resolveNodes(f.someOf(NODE_SET)), hasItems(in(NODE_SET)));
assertThat(f.resolveNodes(f.oneOf(NODE_SET)),
containsSingleFrom(NODE_SET));
@@ -82,16 +81,28 @@ public class ExecutionTargetFactorySelfTest {
@ParameterizedTest
@MethodSource("clusterFactory")
- void targetValidation(ExecutionTargetFactory f) {
- assertThrows(AssertionError.class, () -> f.allOf(List.of()), null);
- assertThrows(AssertionError.class, () -> f.someOf(List.of()), null);
- assertThrows(AssertionError.class, () -> f.oneOf(List.of()), null);
- assertThrows(AssertionError.class, () -> f.partitioned(List.of()),
null);
-
- assertThrows(Throwable.class, () -> f.allOf(INVALID_NODE_SET),
"invalid node");
- assertThrows(Throwable.class, () -> f.someOf(INVALID_NODE_SET),
"invalid node");
- assertThrows(Throwable.class, () -> f.oneOf(INVALID_NODE_SET),
"invalid node");
- assertThrows(Throwable.class, () ->
f.partitioned(assignmentFromPrimaries(INVALID_NODE_SET)), "invalid node");
+ void emptyTargets(ExecutionTargetFactory f) {
+ assertThrows(AssertionError.class, () -> f.allOf(List.of()), "Empty
target is not allowed");
+ assertThrows(AssertionError.class, () -> f.someOf(List.of()), "Empty
target is not allowed");
+ assertThrows(AssertionError.class, () -> f.oneOf(List.of()), "Empty
target is not allowed");
+ assertThrows(AssertionError.class, () -> f.partitioned(List.of()),
"Empty target is not allowed");
+ }
+
+ @ParameterizedTest
+ @MethodSource("clusterFactory")
+ void invalidTargets(ExecutionTargetFactory f) {
+ List<String> invalidNodeSet = List.of("node100");
+ List<String> partiallyInvalidNodeSet =
CollectionUtils.concat(SINGLE_NODE_SET, invalidNodeSet);
+
+ assertThrows(AssertionError.class, () -> f.allOf(invalidNodeSet),
"invalid node");
+ assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet),
"Empty target is not allowed");
+ assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet),
"Empty target is not allowed");
+ assertThrows(AssertionError.class, () ->
f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "No partition node
found");
+
+ assertThrows(Throwable.class, () -> f.allOf(partiallyInvalidNodeSet),
"invalid node");
+ assertThat(f.resolveNodes(f.someOf(partiallyInvalidNodeSet)),
equalTo(SINGLE_NODE_SET));
+ assertThat(f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)),
equalTo(SINGLE_NODE_SET));
+
assertThat(f.resolveNodes(f.partitioned(assignment(partiallyInvalidNodeSet,
partiallyInvalidNodeSet))), equalTo(SINGLE_NODE_SET));
}
@ParameterizedTest
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index 727eb7b023..f12044316f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -116,6 +116,30 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
}
}
+ @Test
+ public void cacheOnStableTopology() {
+ String localNodeName = "NODE";
+ List<String> nodeNames = List.of(localNodeName, "NODE1");
+
+ // Initialize mapping service.
+ ExecutionTargetProvider targetProvider =
Mockito.spy(createTargetProvider(nodeNames));
+ MappingServiceImpl mappingService = new MappingServiceImpl(
+ localNodeName, CLOCK_SERVICE, targetProvider,
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
+ );
+ mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+ new LogicalTopologySnapshot(1,
logicalNodes(nodeNames.toArray(new String[0]))));
+
+
+ List<MappedFragment> defaultMapping = await(mappingService.map(PLAN,
PARAMS));
+ List<MappedFragment> mappingOnBackups = await(mappingService.map(PLAN,
MappingParameters.MAP_ON_BACKUPS));
+
+ verify(targetProvider, times(2)).forTable(any(), any(), any(),
anyBoolean());
+
+ assertSame(defaultMapping, await(mappingService.map(PLAN, PARAMS)));
+ assertSame(mappingOnBackups, await(mappingService.map(PLAN,
MappingParameters.MAP_ON_BACKUPS)));
+ assertNotSame(defaultMapping, mappingOnBackups);
+ }
+
@Test
public void serviceInitializationTest() {
String localNodeName = "NODE0";