This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a404b155c8 IGNITE-22969 Sql. Gather execution nodes information from
placement driver (#4629)
a404b155c8 is described below
commit a404b155c8d21d487dc9a235236db515e27d6d50
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Thu Nov 7 10:08:22 2024 +0300
IGNITE-22969 Sql. Gather execution nodes information from placement driver
(#4629)
---
.../internal/sql/engine/SqlQueryProcessor.java | 10 +-
...der.java => ExecutionDistributionProvider.java} | 20 +-
.../ExecutionDistributionProviderImpl.java} | 59 +---
.../sql/engine/exec/mapping/MappingContext.java | 11 +-
.../engine/exec/mapping/MappingServiceImpl.java | 388 +++++++++++----------
.../mapping/largecluster/LargeClusterFactory.java | 14 +-
.../mapping/smallcluster/SmallClusterFactory.java | 14 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 46 +--
.../mapping/ExecutionTargetFactorySelfTest.java | 17 +-
.../engine/exec/mapping/FragmentMappingTest.java | 7 +-
.../exec/mapping/MappingServiceImplTest.java | 248 ++++++-------
.../sql/engine/exec/mapping/MappingTestRunner.java | 22 +-
.../exec/mapping/MappingTestRunnerSelfTest.java | 2 +-
.../sql/engine/framework/TestBuilders.java | 85 ++---
14 files changed, 435 insertions(+), 508 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index cfca789ce0..0c7f9047e8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -69,6 +69,7 @@ import
org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.fsm.ExecutionPhase;
import org.apache.ignite.internal.sql.engine.exec.fsm.QueryExecutor;
+import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionDistributionProviderImpl;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
@@ -297,22 +298,21 @@ public class SqlQueryProcessor implements QueryProcessor,
SystemViewProvider {
view -> () -> systemViewManager.scanView(view.name())
);
- var executionTargetProvider = new
ExecutionTargetProviderImpl(placementDriver, systemViewManager);
-
var partitionPruner = new PartitionPrunerImpl();
var mappingService = new MappingServiceImpl(
nodeName,
clockService,
- executionTargetProvider,
CACHE_FACTORY,
clusterCfg.planner().estimatedNumberOfQueries().value(),
partitionPruner,
- taskExecutor
+ () -> logicalTopologyService.localLogicalTopology().version(),
+ new ExecutionDistributionProviderImpl(placementDriver,
systemViewManager)
);
- logicalTopologyService.addEventListener(mappingService);
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
mappingService::onPrimaryReplicaExpired);
+ // Need to be implemented after
https://issues.apache.org/jira/browse/IGNITE-23519 Add an event for lease
Assignments
+ // placementDriver.listen(PrimaryReplicaEvent.ASSIGNMENTS_CHANGED,
mappingService::onPrimaryReplicaAssignment);
var executionSrvc = registerService(ExecutionServiceImpl.create(
clusterSrvc.topologyService(),
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProvider.java
similarity index 73%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProvider.java
index d131664cf1..2bccadca63 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProvider.java
@@ -17,38 +17,34 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-/**
- * An integration point that helps the mapper to acquire an execution target
of particular
- * relation from fragment.
- */
-public interface ExecutionTargetProvider {
+/** Execution nodes information provider. */
+public interface ExecutionDistributionProvider {
/**
* Returns an execution target for a given table.
*
* @param operationTime Time of the operation to get consistent results
among different calls.
- * @param factory A factory to create target for given table.
* @param table A table to create execution target for.
* @param includeBackups Flags denotes whether to include non-primary
replicas into target.
* @return A future representing the result.
*/
- CompletableFuture<ExecutionTarget> forTable(
+ CompletableFuture<List<TokenizedAssignments>> forTable(
HybridTimestamp operationTime,
- ExecutionTargetFactory factory,
IgniteTable table,
boolean includeBackups
);
/**
- * Returns an execution target for a given view.
+ * Returns a distribution for a given view.
*
- * @param factory A factory to create target for given table.
* @param view A view to create execution target for.
- * @return A future representing the result.
+ * @return Distribution for a given view.
*/
- CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory
factory, IgniteSystemView view);
+ List<String> forSystemView(IgniteSystemView view);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
similarity index 79%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
index 86d0650641..82c5b1cc89 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
@@ -15,14 +15,11 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine;
+package org.apache.ignite.internal.sql.engine.exec.mapping;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
@@ -44,59 +41,39 @@ import
org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.sql.SqlException;
-
-/**
- * Implementation of {@link ExecutionTargetProvider} which takes assignments
from {@link PlacementDriver} and {@link SystemViewManager}.
- */
-public class ExecutionTargetProviderImpl implements ExecutionTargetProvider {
- private static final IgniteLogger LOG =
Loggers.forClass(ExecutionTargetProviderImpl.class);
+/** Execution nodes information provider. */
+public class ExecutionDistributionProviderImpl implements
ExecutionDistributionProvider {
+ private static final IgniteLogger LOG =
Loggers.forClass(ExecutionDistributionProviderImpl.class);
private final PlacementDriver placementDriver;
private final SystemViewManager systemViewManager;
- ExecutionTargetProviderImpl(
- PlacementDriver placementDriver, SystemViewManager
systemViewManager
- ) {
+ /**
+ * Constructor.
+ *
+ * @param placementDriver Placement driver.
+ * @param systemViewManager Manager for system views.
+ */
+ public ExecutionDistributionProviderImpl(PlacementDriver placementDriver,
SystemViewManager systemViewManager) {
this.placementDriver = placementDriver;
this.systemViewManager = systemViewManager;
}
@Override
- public CompletableFuture<ExecutionTarget> forTable(
+ public List<String> forSystemView(IgniteSystemView view) {
+ return systemViewManager.owningNodes(view.name());
+ }
+
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>> forTable(
HybridTimestamp operationTime,
- ExecutionTargetFactory factory,
IgniteTable table,
boolean includeBackups
) {
- return collectAssignments(table, operationTime, includeBackups)
- .thenApply(factory::partitioned);
- }
-
- @Override
- public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
- List<String> nodes = systemViewManager.owningNodes(view.name());
-
- if (nullOrEmpty(nodes)) {
- return failedFuture(
- new SqlException(Sql.MAPPING_ERR, format("The view with
name '{}' could not be found on"
- + " any active nodes in the cluster", view.name()))
- );
- }
-
- return completedFuture(
- view.distribution() == IgniteDistributions.single()
- ? factory.oneOf(nodes)
- : factory.allOf(nodes)
- );
+ return collectAssignments(table, operationTime, includeBackups);
}
// need to be refactored after TODO:
https://issues.apache.org/jira/browse/IGNITE-20925
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
index b230df95ab..cd7f745465 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import
org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory;
import
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
-import org.apache.ignite.internal.sql.engine.util.Commons;
/**
* A context that encloses information necessary during mapping.
@@ -29,23 +28,19 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
class MappingContext {
private final String localNode;
private final List<String> nodes;
+ private final RelOptCluster cluster;
private final ExecutionTargetFactory targetFactory;
- private RelOptCluster cluster;
-
- MappingContext(String localNode, List<String> nodes) {
+ MappingContext(String localNode, List<String> nodes, RelOptCluster
cluster) {
this.localNode = localNode;
this.nodes = nodes;
+ this.cluster = cluster;
this.targetFactory = nodes.size() > 64 ? new
LargeClusterFactory(nodes) : new SmallClusterFactory(nodes);
}
public RelOptCluster cluster() {
- if (cluster == null) {
- cluster = Commons.cluster();
- }
-
return cluster;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index bcdf364850..34c5cd4f2c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -17,9 +17,13 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
-import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static
org.apache.ignite.internal.util.CollectionUtils.toIntMapCollector;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntObjectPair;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -32,18 +36,16 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
-import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.hlc.ClockService;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl.LogicalTopologyHolder.TopologySnapshot;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.PlanId;
@@ -51,64 +53,58 @@ import
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner;
import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.cache.Cache;
import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.sql.SqlException;
/**
* An implementation of {@link MappingService}.
*
- * <p>This particular implementation keeps track of changes in logical cluster
topology.
- * Always uses latest topology snapshot to map query.
+ * <p>This particular implementation keeps track of changes according to
{@link PlacementDriver} assignments.
+ * Use distribution information to map a query.
*/
-public class MappingServiceImpl implements MappingService,
LogicalTopologyEventListener {
- private final LogicalTopologyHolder topologyHolder = new
LogicalTopologyHolder();
- private final CompletableFuture<Void> initialTopologyFuture = new
CompletableFuture<>();
-
+public class MappingServiceImpl implements MappingService {
private final String localNodeName;
private final ClockService clock;
- private final ExecutionTargetProvider targetProvider;
private final Cache<PlanId, FragmentsTemplate> templatesCache;
private final Cache<MappingsCacheKey, MappingsCacheValue> mappingsCache;
- private final Executor taskExecutor;
private final PartitionPruner partitionPruner;
+ private final Supplier<Long> logicalTopologyVerSupplier;
+ private final ExecutionDistributionProvider distributionProvider;
/**
* Constructor.
*
* @param localNodeName Name of the current Ignite node.
* @param clock Clock service to get actual time.
- * @param targetProvider Execution target provider.
* @param cacheFactory A factory to create cache of fragments.
* @param cacheSize Size of the cache of query plans. Should be non
negative.
* @param partitionPruner Partition pruner.
- * @param taskExecutor Mapper service task executor.
+ * @param logicalTopologyVerSupplier Logical topology version supplier.
+ * @param distributionProvider Execution distribution provider.
*/
public MappingServiceImpl(
String localNodeName,
ClockService clock,
- ExecutionTargetProvider targetProvider,
CacheFactory cacheFactory,
int cacheSize,
PartitionPruner partitionPruner,
- Executor taskExecutor
+ Supplier<Long> logicalTopologyVerSupplier,
+ ExecutionDistributionProvider distributionProvider
) {
this.localNodeName = localNodeName;
this.clock = clock;
- this.targetProvider = targetProvider;
this.templatesCache = cacheFactory.create(cacheSize);
this.mappingsCache = cacheFactory.create(cacheSize);
- this.taskExecutor = taskExecutor;
this.partitionPruner = partitionPruner;
- }
-
- @Override
- public CompletableFuture<List<MappedFragment>> map(MultiStepPlan
multiStepPlan, MappingParameters parameters) {
- if (initialTopologyFuture.isDone()) {
- return map0(multiStepPlan, parameters);
- }
-
- return initialTopologyFuture.thenComposeAsync(ignore ->
map0(multiStepPlan, parameters), taskExecutor);
+ this.logicalTopologyVerSupplier = logicalTopologyVerSupplier;
+ this.distributionProvider = distributionProvider;
}
/** Called when the primary replica has expired. */
@@ -124,13 +120,12 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
return CompletableFutures.falseCompletedFuture();
}
- private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan
multiStepPlan, MappingParameters parameters) {
- TopologySnapshot topology = topologyHolder.topology();
- MappingContext context = new MappingContext(localNodeName,
topology.nodes());
-
- FragmentsTemplate template = getOrCreateTemplate(multiStepPlan,
context);
+ @Override
+ public CompletableFuture<List<MappedFragment>> map(MultiStepPlan
multiStepPlan, MappingParameters parameters) {
+ FragmentsTemplate template = getOrCreateTemplate(multiStepPlan);
boolean mapOnBackups = parameters.mapOnBackups();
+
MappingsCacheValue cacheValue = mappingsCache.compute(
new MappingsCacheKey(multiStepPlan.id(), mapOnBackups),
(key, val) -> {
@@ -145,203 +140,213 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
}
}
- long topVer = topologyAware ? topology.version() :
Long.MAX_VALUE;
+ long topVer = topologyAware ?
logicalTopologyVerSupplier.get() : Long.MAX_VALUE;
- return new MappingsCacheValue(topVer, tableIds,
mapFragments(context, template, key.mapOnBackups));
+ return new MappingsCacheValue(topVer, tableIds,
mapFragments(template, mapOnBackups));
}
- if (val.topVer < topology.version()) {
- return new MappingsCacheValue(topology.version(),
val.tableIds, mapFragments(context, template, key.mapOnBackups));
+ long topologyVer = logicalTopologyVerSupplier.get();
+
+ if (val.topologyVersion < topologyVer) {
+ return new MappingsCacheValue(topologyVer,
val.tableIds, mapFragments(template, mapOnBackups));
}
return val;
- }
- );
+ });
- return cacheValue.mappedFragments.thenApply(mappedFragments ->
applyPartitionPruning(mappedFragments.fragments, parameters));
+ return cacheValue.mappedFragments.thenApply(frags ->
applyPartitionPruning(frags.fragments, parameters));
}
- private CompletableFuture<MappedFragments> mapFragments(
- MappingContext context,
- FragmentsTemplate template,
+ CompletableFuture<DistributionHolder> composeDistributions(
+ Set<IgniteSystemView> views,
+ Set<IgniteTable> tables,
boolean mapOnBackups
) {
- IdGenerator idGenerator = new IdGenerator(template.nextId);
- List<Fragment> fragments = new ArrayList<>(template.fragments);
- HybridTimestamp mappingTime = clock.now();
-
- List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets =
- fragments.stream().flatMap(fragment -> Stream.concat(
- fragment.tables().values().stream()
- .map(table ->
targetProvider.forTable(mappingTime, context.targetFactory(), table,
mapOnBackups)
- .thenApply(target ->
IntObjectPair.of(table.id(), target))
- ),
- fragment.systemViews().stream()
- .map(view ->
targetProvider.forSystemView(context.targetFactory(), view)
- .thenApply(target ->
IntObjectPair.of(view.id(), target))
- )
- ))
- .collect(Collectors.toList());
-
- return allOf(targets.toArray(new CompletableFuture[0]))
- .thenApply(ignored -> {
- Int2ObjectMap<ExecutionTarget> targetsById = new
Int2ObjectOpenHashMap<>();
-
- for (CompletableFuture<IntObjectPair<ExecutionTarget>> fut
: targets) {
- // this is a safe join, because we have waited for all
futures to be complete
- IntObjectPair<ExecutionTarget> pair = fut.join();
-
- targetsById.put(pair.firstInt(), pair.second());
- }
+ if (tables.isEmpty() && views.isEmpty()) {
+ DistributionHolder holder = new
DistributionHolder(Set.of(localNodeName), Int2ObjectMaps.emptyMap(),
Int2ObjectMaps.emptyMap());
- FragmentMapper mapper = new
FragmentMapper(template.cluster.getMetadataQuery(), context, targetsById);
+ return completedFuture(holder);
+ } else {
+ Int2ObjectMap<CompletableFuture<List<TokenizedAssignments>>>
tablesAssignments = new Int2ObjectOpenHashMap<>(tables.size());
+ Set<String> allNodes = new HashSet<>();
- List<FragmentMapping> mappings = mapper.map(fragments,
idGenerator);
+ allNodes.add(localNodeName);
- Long2ObjectMap<ColocationGroup> groupsBySourceId = new
Long2ObjectOpenHashMap<>();
- Long2ObjectMap<List<String>> allSourcesByExchangeId = new
Long2ObjectOpenHashMap<>();
+ for (IgniteTable tbl : tables) {
+ CompletableFuture<List<TokenizedAssignments>> assignments =
distributionProvider
+ .forTable(clock.now(), tbl, mapOnBackups);
- for (FragmentMapping mapping : mappings) {
- Fragment fragment = mapping.fragment();
+ tablesAssignments.put(tbl.id(), assignments);
+ }
- for (ColocationGroup group : mapping.groups()) {
- for (long sourceId : group.sourceIds()) {
- groupsBySourceId.put(sourceId, group);
- }
- }
+ return
CompletableFuture.allOf(tablesAssignments.values().toArray(new
CompletableFuture[0]))
+ .thenApply(ignore -> {
+ Int2ObjectMap<List<TokenizedAssignments>>
assignmentsPerTable = new Int2ObjectOpenHashMap<>(tables.size());
- if (!fragment.rootFragment()) {
- IgniteSender sender = (IgniteSender)
fragment.root();
+ tablesAssignments.keySet().forEach(k -> {
+ // this is a safe join, because we have waited for
all futures to be completed
+ List<TokenizedAssignments> assignments =
tablesAssignments.get(k).join();
- List<String> nodeNames = mapping.groups().stream()
- .flatMap(g -> g.nodeNames().stream())
- .distinct().collect(Collectors.toList());
+ assignments.stream().flatMap(i ->
i.nodes().stream()).map(Assignment::consistentId).forEach(allNodes::add);
- allSourcesByExchangeId.put(sender.exchangeId(),
nodeNames);
- }
- }
+ assignmentsPerTable.put(k, assignments);
+ });
- List<MappedFragment> mappedFragmentsList = new
ArrayList<>(mappings.size());
- Set<String> targetNodes = new HashSet<>();
- for (FragmentMapping mapping : mappings) {
- Fragment fragment = mapping.fragment();
+ return assignmentsPerTable;
+ })
+ .thenApply(assignmentsPerTable -> {
+ Int2ObjectMap<List<String>> nodesPerView =
views.stream()
+
.collect(toIntMapCollector(IgniteDataSource::id,
distributionProvider::forSystemView));
- ColocationGroup targetGroup = null;
- if (!fragment.rootFragment()) {
- IgniteSender sender = (IgniteSender)
fragment.root();
+
nodesPerView.values().stream().flatMap(List::stream).forEach(allNodes::add);
- targetGroup =
groupsBySourceId.get(sender.exchangeId());
- }
+ return new DistributionHolder(allNodes,
assignmentsPerTable, nodesPerView);
+ });
+ }
+ }
- Long2ObjectMap<List<String>> sourcesByExchangeId =
null;
- for (IgniteReceiver receiver : fragment.remotes()) {
- if (sourcesByExchangeId == null) {
- sourcesByExchangeId = new
Long2ObjectOpenHashMap<>();
- }
+ private CompletableFuture<MappedFragments> mapFragments(
+ FragmentsTemplate template,
+ boolean mapOnBackups
+ ) {
+ Set<IgniteSystemView> views =
template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream())
+ .collect(Collectors.toSet());
- long exchangeId = receiver.exchangeId();
+ Set<IgniteTable> tables = template.fragments.stream().flatMap(fragment
-> fragment.tables().values().stream())
+ .collect(Collectors.toSet());
- sourcesByExchangeId.put(exchangeId,
allSourcesByExchangeId.get(exchangeId));
- }
+ CompletableFuture<DistributionHolder> res =
composeDistributions(views, tables, mapOnBackups);
- MappedFragment mappedFragment = new MappedFragment(
- fragment,
- mapping.groups(),
- sourcesByExchangeId,
- targetGroup,
- null
- );
+ return res.thenApply(assignments -> {
+ Int2ObjectMap<ExecutionTarget> targetsById = new
Int2ObjectOpenHashMap<>();
- mappedFragmentsList.add(mappedFragment);
+ MappingContext context = new MappingContext(localNodeName, new
ArrayList<>(assignments.nodes()), template.cluster);
- targetNodes.addAll(mappedFragment.nodes());
- }
+ ExecutionTargetFactory targetFactory = context.targetFactory();
- return new MappedFragments(mappedFragmentsList,
targetNodes);
- });
- }
+ List<IntObjectPair<ExecutionTarget>> allTargets =
prepareTargets(template, assignments, targetFactory);
- @Override
- public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot
newTopology) {
- topologyHolder.update(newTopology);
- }
+ for (IntObjectPair<ExecutionTarget> pair : allTargets) {
+ targetsById.put(pair.firstInt(), pair.second());
+ }
- @Override
- public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot
newTopology) {
- topologyHolder.update(newTopology);
+ FragmentMapper mapper = new
FragmentMapper(context.cluster().getMetadataQuery(), context, targetsById);
- mappingsCache.removeIfValue(value ->
- !value.mappedFragments.isDone() // Invalidate non-completed
mappings to reduce the chance of getting stale value
- ||
value.mappedFragments.join().nodes.contains(leftNode.name()));
- }
+ IdGenerator idGenerator = new IdGenerator(template.nextId);
- @Override
- public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
- topologyHolder.update(newTopology);
- }
+ List<Fragment> fragments = new ArrayList<>(template.fragments);
- private List<MappedFragment> applyPartitionPruning(List<MappedFragment>
mappedFragments, MappingParameters parameters) {
- return partitionPruner.apply(mappedFragments,
parameters.dynamicParameters());
- }
+ List<FragmentMapping> mappings = mapper.map(fragments,
idGenerator);
- /**
- * Holder for topology snapshots that guarantees monotonically increasing
versions.
- */
- class LogicalTopologyHolder {
- private volatile TopologySnapshot topology = new
TopologySnapshot(Long.MIN_VALUE, List.of());
+ Long2ObjectMap<ColocationGroup> groupsBySourceId = new
Long2ObjectOpenHashMap<>();
+ Long2ObjectMap<List<String>> allSourcesByExchangeId = new
Long2ObjectOpenHashMap<>();
+
+ for (FragmentMapping mapping : mappings) {
+ Fragment fragment = mapping.fragment();
- void update(LogicalTopologySnapshot topologySnapshot) {
- synchronized (this) {
- if (topology.version() < topologySnapshot.version()) {
- topology = new
TopologySnapshot(topologySnapshot.version(), deriveNodeNames(topologySnapshot));
+ for (ColocationGroup group : mapping.groups()) {
+ for (long sourceId : group.sourceIds()) {
+ groupsBySourceId.put(sourceId, group);
+ }
}
- if (initialTopologyFuture.isDone() ||
!topology.nodes().contains(localNodeName)) {
- return;
+ if (!fragment.rootFragment()) {
+ IgniteSender sender = (IgniteSender) fragment.root();
+
+ List<String> nodeNames = mapping.groups().stream()
+ .flatMap(g -> g.nodeNames().stream())
+ .distinct().collect(Collectors.toList());
+
+ allSourcesByExchangeId.put(sender.exchangeId(), nodeNames);
}
}
- initialTopologyFuture.complete(null);
- }
+ List<MappedFragment> mappedFragmentsList = new
ArrayList<>(mappings.size());
+ Set<String> targetNodes = new HashSet<>();
+ for (FragmentMapping mapping : mappings) {
+ Fragment fragment = mapping.fragment();
- TopologySnapshot topology() {
- return topology;
- }
+ ColocationGroup targetGroup = null;
+ if (!fragment.rootFragment()) {
+ IgniteSender sender = (IgniteSender) fragment.root();
- private List<String> deriveNodeNames(LogicalTopologySnapshot topology)
{
- return topology.nodes().stream()
- .map(LogicalNode::name)
- .collect(Collectors.toUnmodifiableList());
- }
+ targetGroup = groupsBySourceId.get(sender.exchangeId());
+ }
- class TopologySnapshot {
- private final List<String> nodes;
- private final long version;
+ Long2ObjectMap<List<String>> sourcesByExchangeId = null;
+ for (IgniteReceiver receiver : fragment.remotes()) {
+ if (sourcesByExchangeId == null) {
+ sourcesByExchangeId = new Long2ObjectOpenHashMap<>();
+ }
- TopologySnapshot(long version, List<String> nodes) {
- this.version = version;
- this.nodes = nodes;
- }
+ long exchangeId = receiver.exchangeId();
- public List<String> nodes() {
- return nodes;
- }
+ sourcesByExchangeId.put(exchangeId,
allSourcesByExchangeId.get(exchangeId));
+ }
+
+ MappedFragment mappedFragment = new MappedFragment(
+ fragment,
+ mapping.groups(),
+ sourcesByExchangeId,
+ targetGroup,
+ null
+ );
- public long version() {
- return version;
+ mappedFragmentsList.add(mappedFragment);
+
+ targetNodes.addAll(mappedFragment.nodes());
}
+
+ return new MappedFragments(mappedFragmentsList, targetNodes);
+ });
+ }
+
+ private static List<IntObjectPair<ExecutionTarget>> prepareTargets(
+ FragmentsTemplate template,
+ DistributionHolder distr,
+ ExecutionTargetFactory targetFactory
+ ) {
+ Stream<IntObjectPair<ExecutionTarget>> tableTargets =
template.fragments.stream().flatMap(fragment ->
+ fragment.tables().values().stream()
+ .map(table -> IntObjectPair.of(table.id(),
+ buildTargetforTable(targetFactory,
distr.tableAssignments(table.id())))));
+
+ Stream<IntObjectPair<ExecutionTarget>> viewTargets =
template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream()
+ .map(view -> IntObjectPair.of(view.id(),
buildTargetForSystemView(targetFactory, view, distr.viewNodes(view.id())))));
+
+ return Stream.concat(tableTargets,
viewTargets).collect(Collectors.toList());
+ }
+
+ private static ExecutionTarget
buildTargetForSystemView(ExecutionTargetFactory factory, IgniteSystemView view,
List<String> nodes) {
+ if (nullOrEmpty(nodes)) {
+ throw new SqlException(Sql.MAPPING_ERR, format("The view with name
'{}' could not be found on"
+ + " any active nodes in the cluster", view.name()));
}
+
+ return view.distribution() == IgniteDistributions.single()
+ ? factory.oneOf(nodes)
+ : factory.allOf(nodes);
+ }
+
+ private static ExecutionTarget buildTargetforTable(ExecutionTargetFactory
factory, List<TokenizedAssignments> assignments) {
+ return factory.partitioned(assignments);
}
- private FragmentsTemplate getOrCreateTemplate(MultiStepPlan plan,
MappingContext context) {
+ private List<MappedFragment> applyPartitionPruning(List<MappedFragment>
mappedFragments, MappingParameters parameters) {
+ return partitionPruner.apply(mappedFragments,
parameters.dynamicParameters());
+ }
+
+ private FragmentsTemplate getOrCreateTemplate(MultiStepPlan plan) {
// QuerySplitter is deterministic, thus we can cache result in order
to reuse it next time
return templatesCache.get(plan.id(), key -> {
IdGenerator idGenerator = new IdGenerator(0);
- List<Fragment> fragments = new QuerySplitter(idGenerator,
context.cluster()).split(plan.root());
+ RelOptCluster cluster = Commons.cluster();
+
+ List<Fragment> fragments = new QuerySplitter(idGenerator,
cluster).split(plan.root());
return new FragmentsTemplate(
- idGenerator.nextId(), context.cluster(), fragments
+ idGenerator.nextId(), cluster, fragments
);
});
}
@@ -370,12 +375,12 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
}
private static class MappingsCacheValue {
- private final long topVer;
+ private final long topologyVersion;
private final IntSet tableIds;
private final CompletableFuture<MappedFragments> mappedFragments;
- MappingsCacheValue(long topVer, IntSet tableIds,
CompletableFuture<MappedFragments> mappedFragments) {
- this.topVer = topVer;
+ MappingsCacheValue(long topologyVersion, IntSet tableIds,
CompletableFuture<MappedFragments> mappedFragments) {
+ this.topologyVersion = topologyVersion;
this.tableIds = tableIds;
this.mappedFragments = mappedFragments;
}
@@ -409,4 +414,31 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
return Objects.hash(planId, mapOnBackups);
}
}
+
+ private static class DistributionHolder {
+ private final Set<String> nodes;
+ private final Int2ObjectMap<List<TokenizedAssignments>>
assignmentsPerTable;
+ private final Int2ObjectMap<List<String>> nodesPerView;
+
+ DistributionHolder(
+ Set<String> nodes,
+ Int2ObjectMap<List<TokenizedAssignments>> assignmentsPerTable,
+ Int2ObjectMap<List<String>> nodesPerView) {
+ this.nodes = nodes;
+ this.assignmentsPerTable = assignmentsPerTable;
+ this.nodesPerView = nodesPerView;
+ }
+
+ Set<String> nodes() {
+ return nodes;
+ }
+
+ List<TokenizedAssignments> tableAssignments(int tableId) {
+ return assignmentsPerTable.get(tableId);
+ }
+
+ List<String> viewNodes(int viewId) {
+ return nodesPerView.get(viewId);
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
index 5453f2ab7d..d5d5be9877 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
@@ -85,10 +85,9 @@ public class LargeClusterFactory implements
ExecutionTargetFactory {
for (Assignment a : assignment.nodes()) {
int node = nodeNameToId.getOrDefault(a.consistentId(), -1);
- // TODO Ignore unknown node until IGNITE-22969
- if (node != -1) {
- nodes.set(node);
- }
+ assert node >= 0 : "invalid node";
+
+ nodes.set(node);
}
assert !nodes.isEmpty() : "No partition node found";
@@ -127,10 +126,9 @@ public class LargeClusterFactory implements
ExecutionTargetFactory {
for (String name : nodes) {
int id = nodeNameToId.getOrDefault(name, -1);
- // TODO Ignore unknown node until IGNITE-22969
- if (id != -1) {
- nodesSet.set(id);
- }
+ assert id >= 0 : "invalid node";
+
+ nodesSet.set(id);
}
return nodesSet;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index 66a173828f..fde0fe0fac 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -89,10 +89,9 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
for (Assignment a : assignment.nodes()) {
long node = nodeNameToId.getOrDefault(a.consistentId(), -1);
- // TODO Ignore unknown node until IGNITE-22969
- if (node != -1) {
- currentPartitionNodes |= node;
- }
+ assert node >= 0 : "invalid node";
+
+ currentPartitionNodes |= node;
}
assert currentPartitionNodes != 0L : "No partition node found";
@@ -132,10 +131,9 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
for (String name : nodes) {
long node = nodeNameToId.getOrDefault(name, -1);
- // TODO Ignore unknown node until IGNITE-22969
- if (node != -1) {
- nodesMap |= node;
- }
+ assert node >= 0 : "invalid node";
+
+ nodesMap |= node;
}
return nodesMap;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 75f8516704..2e05f72ddd 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.exec;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
@@ -78,14 +77,11 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.CatalogCommand;
-import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.RunnableX;
@@ -106,10 +102,8 @@ import
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestC
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
import
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
+import
org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImplTest.TestExecutionDistributionProvider;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Node;
@@ -138,7 +132,6 @@ import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverte
import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
-import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
@@ -1139,15 +1132,9 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
ExecutionDependencyResolver dependencyResolver = new
ExecutionDependencyResolverImpl(executableTableRegistry, null);
- var mappingService = createMappingService(nodeName, clockService,
taskExecutor, mappingCacheFactory);
+ var mappingService = createMappingService(nodeName, clockService,
mappingCacheFactory, nodeNames);
var tableFunctionRegistry = new TableFunctionRegistryImpl();
- List<LogicalNode> logicalNodes = nodeNames.stream()
- .map(name -> new LogicalNode(randomUUID(), name,
NetworkAddress.from("127.0.0.1:10000")))
- .collect(Collectors.toList());
-
- mappingService.onTopologyLeap(new LogicalTopologySnapshot(1,
logicalNodes));
-
var executionService = new ExecutionServiceImpl<>(
messageService,
topologyService,
@@ -1173,32 +1160,15 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
private MappingServiceImpl createMappingService(
String nodeName,
ClockService clock,
- QueryTaskExecutor taskExecutor,
- CacheFactory cacheFactory
+ CacheFactory cacheFactory,
+ List<String> logicalNodes
) {
- var targetProvider = new ExecutionTargetProvider() {
- @Override
- public CompletableFuture<ExecutionTarget> forTable(
- HybridTimestamp operationTime,
- ExecutionTargetFactory factory,
- IgniteTable table,
- boolean includeBackups
- ) {
- if (mappingException != null) {
- return CompletableFuture.failedFuture(mappingException);
- }
-
- return completedFuture(factory.allOf(nodeNames));
- }
+ PartitionPruner partitionPruner = (mappedFragments, dynamicParameters)
-> mappedFragments;
- @Override
- public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
- return CompletableFuture.failedFuture(new AssertionError("Not
supported"));
- }
- };
+ Supplier<Long> topologyVerSupplier = () -> Long.MAX_VALUE;
- PartitionPruner partitionPruner = (mappedFragments, dynamicParameters)
-> mappedFragments;
- return new MappingServiceImpl(nodeName, clock, targetProvider,
cacheFactory, 0, partitionPruner, taskExecutor);
+ return new MappingServiceImpl(nodeName, clock, cacheFactory, 0,
partitionPruner, topologyVerSupplier,
+ new TestExecutionDistributionProvider(logicalNodes, () ->
mappingException));
}
private SqlOperationContext createContext() {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
index 5c82615c76..5fc233d0be 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -95,14 +95,15 @@ public class ExecutionTargetFactorySelfTest {
List<String> partiallyInvalidNodeSet =
CollectionUtils.concat(SINGLE_NODE_SET, invalidNodeSet);
assertThrows(AssertionError.class, () -> f.allOf(invalidNodeSet),
"invalid node");
- assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet),
"Empty target is not allowed");
- assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet),
"Empty target is not allowed");
- assertThrows(AssertionError.class, () ->
f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "No partition node
found");
-
- assertThrows(Throwable.class, () -> f.allOf(partiallyInvalidNodeSet),
"invalid node");
- assertThat(f.resolveNodes(f.someOf(partiallyInvalidNodeSet)),
equalTo(SINGLE_NODE_SET));
- assertThat(f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)),
equalTo(SINGLE_NODE_SET));
-
assertThat(f.resolveNodes(f.partitioned(assignment(partiallyInvalidNodeSet,
partiallyInvalidNodeSet))), equalTo(SINGLE_NODE_SET));
+ assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet),
"invalid node");
+ assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet),
"invalid node");
+ assertThrows(AssertionError.class, () ->
f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "invalid node");
+
+ assertThrows(AssertionError.class, () ->
f.allOf(partiallyInvalidNodeSet), "invalid node");
+ assertThrows(AssertionError.class, () ->
f.resolveNodes(f.someOf(partiallyInvalidNodeSet)), "invalid node");
+ assertThrows(AssertionError.class, () ->
f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)), "invalid node");
+ assertThrows(AssertionError.class, () -> f.resolveNodes(
+ f.partitioned(assignment(partiallyInvalidNodeSet,
partiallyInvalidNodeSet))), "invalid node");
}
@ParameterizedTest
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index 99481eaf5b..45629d47e3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -413,14 +413,15 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
objectId += 1;
}
+ LogicalTopologySnapshot logicalTopologySnapshot = newLogicalTopology();
+
IgniteSchema schema = new IgniteSchema(SqlCommon.DEFAULT_SCHEMA_NAME,
1, dataSources);
- ExecutionTargetProvider executionTargetProvider =
TestBuilders.executionTargetProviderBuilder()
+ ExecutionDistributionProvider executionDistributionProvider =
TestBuilders.executionDistributionProviderBuilder()
.useTablePartitions(true)
.addTables(table2Assignments)
.build();
- LogicalTopologySnapshot logicalTopologySnapshot = newLogicalTopology();
- return new TestSetup(executionTargetProvider, schema,
logicalTopologySnapshot);
+ return new TestSetup(executionDistributionProvider, schema,
logicalTopologySnapshot);
}
private void validateAssignments(String tableName, List<List<String>>
assignments) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index d27450f94e..fef3e7fa88 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -17,39 +17,38 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
-import static java.util.UUID.randomUUID;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
-import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
+import
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -58,14 +57,11 @@ import
org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner;
import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
-import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.systemview.api.SystemViews;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.SubscriptionUtils;
-import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -80,6 +76,8 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
private static final ClockService CLOCK_SERVICE = new TestClockService(new
TestHybridClock(System::currentTimeMillis));
private static final MappingParameters PARAMS = MappingParameters.EMPTY;
private static final PartitionPruner PARTITION_PRUNER = (fragments,
dynParams) -> fragments;
+ private long topologyVer;
+ private boolean topologyChange;
static {
// @formatter:off
@@ -124,18 +122,23 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
List<String> nodeNames = List.of(localNodeName, "NODE1");
// Initialize mapping service.
- ExecutionTargetProvider targetProvider =
Mockito.spy(createTargetProvider(nodeNames));
- MappingServiceImpl mappingService = new MappingServiceImpl(
- localNodeName, CLOCK_SERVICE, targetProvider,
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
- );
- mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
- new LogicalTopologySnapshot(1,
logicalNodes(nodeNames.toArray(new String[0]))));
+ Supplier<Long> logicalTopologyVerSupplier =
createChangingTopologySupplier();
+ TestExecutionDistributionProvider execProvider = Mockito.spy(new
TestExecutionDistributionProvider(nodeNames));
+ MappingServiceImpl mappingService = Mockito.spy(new MappingServiceImpl(
+ localNodeName,
+ CLOCK_SERVICE,
+ CaffeineCacheFactory.INSTANCE,
+ 100,
+ PARTITION_PRUNER,
+ logicalTopologyVerSupplier,
+ execProvider
+ ));
List<MappedFragment> defaultMapping = await(mappingService.map(PLAN,
PARAMS));
List<MappedFragment> mappingOnBackups = await(mappingService.map(PLAN,
MappingParameters.MAP_ON_BACKUPS));
- verify(targetProvider, times(2)).forTable(any(), any(), any(),
anyBoolean());
+ verify(execProvider, times(2)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
assertSame(defaultMapping, await(mappingService.map(PLAN, PARAMS)));
assertSame(mappingOnBackups, await(mappingService.map(PLAN,
MappingParameters.MAP_ON_BACKUPS)));
@@ -147,115 +150,53 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
String localNodeName = "NODE0";
MappingServiceImpl mappingService =
createMappingServiceNoCache(localNodeName, List.of(localNodeName));
- mappingService.onNodeJoined(Mockito.mock(LogicalNode.class), new
LogicalTopologySnapshot(1, logicalNodes(localNodeName)));
CompletableFuture<List<MappedFragment>> mappingFuture =
mappingService.map(PLAN, PARAMS);
assertThat(mappingFuture, willSucceedFast());
}
- @Test
- public void lateServiceInitializationOnTopologyLeap() {
- String localNodeName = "NODE";
- List<String> nodeNames = List.of("NODE1");
-
- MappingServiceImpl mappingService =
createMappingServiceNoCache(localNodeName, nodeNames);
-
- CompletableFuture<List<MappedFragment>> mappingFuture =
mappingService.map(PLAN, PARAMS);
-
- // Mapping should wait for service initialization.
- assertFalse(mappingFuture.isDone());
-
- // Join another node affect nothing.
- mappingService.onTopologyLeap(new LogicalTopologySnapshot(1,
logicalNodes("NODE1", "NODE2")));
- assertThat(mappingFuture, willThrowFast(TimeoutException.class));
-
- // Joining local node completes initialization.
- mappingService.onTopologyLeap(new LogicalTopologySnapshot(2,
logicalNodes("NODE", "NODE1", "NODE2")));
-
- assertThat(mappingFuture, willSucceedFast());
- assertThat(mappingService.map(PLAN, PARAMS), willSucceedFast());
- }
-
- @Test
- public void lateServiceInitializationOnNodeJoin() {
- String localNodeName = "NODE";
- List<String> nodeNames = List.of("NODE1");
-
- MappingServiceImpl mappingService =
createMappingServiceNoCache(localNodeName, nodeNames);
-
- CompletableFuture<List<MappedFragment>> mappingFuture =
mappingService.map(PLAN, PARAMS);
-
- // Mapping should wait for service initialization.
- assertFalse(mappingFuture.isDone());
-
- // Join another node affect nothing.
- mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
- new LogicalTopologySnapshot(1, logicalNodes("NODE1",
"NODE2")));
-
- assertThat(mappingFuture, willThrowFast(TimeoutException.class));
-
- // Joining local node completes initialization.
- mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
- new LogicalTopologySnapshot(2, logicalNodes("NODE", "NODE1",
"NODE2")));
-
- assertThat(mappingFuture, willSucceedFast());
- assertThat(mappingService.map(PLAN, PARAMS), willSucceedFast());
- }
-
@Test
public void testCacheInvalidationOnTopologyChange() {
String localNodeName = "NODE";
List<String> nodeNames = List.of(localNodeName, "NODE1");
- // Initialize mapping service.
- ExecutionTargetProvider targetProvider =
Mockito.spy(createTargetProvider(nodeNames));
- MappingServiceImpl mappingService = new MappingServiceImpl(
- localNodeName, CLOCK_SERVICE, targetProvider,
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
- );
+ Supplier<Long> logicalTopologyVerSupplier =
createTriggeredTopologySupplier();
+ TestExecutionDistributionProvider execProvider = Mockito.spy(new
TestExecutionDistributionProvider(nodeNames));
- mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
- new LogicalTopologySnapshot(1,
logicalNodes(nodeNames.toArray(new String[0]))));
+ MappingServiceImpl mappingService = Mockito.spy(new MappingServiceImpl(
+ localNodeName,
+ CLOCK_SERVICE,
+ CaffeineCacheFactory.INSTANCE,
+ 100,
+ PARTITION_PRUNER,
+ logicalTopologyVerSupplier,
+ execProvider
+ ));
List<MappedFragment> tableOnlyMapping = await(mappingService.map(PLAN,
PARAMS));
List<MappedFragment> sysViewMapping =
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
- verify(targetProvider, times(2)).forTable(any(), any(), any(),
anyBoolean());
- verify(targetProvider, times(1)).forSystemView(any(), any());
+ verify(execProvider, times(2)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
+ verify(execProvider, times(1)).forSystemView(any());
+
+ verify(mappingService, times(2)).composeDistributions(anySet(),
anySet(), anyBoolean());
assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
assertSame(sysViewMapping,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
- LogicalNode newNode = Mockito.mock(LogicalNode.class);
- Mockito.when(newNode.name()).thenReturn("NODE2");
+ verifyNoMoreInteractions(execProvider);
- mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
- new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1",
"NODE2")));
-
- // Plan with tables only must not be invalidated on node join.
- assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
- verifyNoMoreInteractions(targetProvider);
+ topologyChange = true;
// Plan with system views must be invalidated.
assertNotSame(sysViewMapping,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
- mappingService.onNodeLeft(newNode,
- new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1")));
-
- // Plan with tables that don't include a left node should not be
invalidated.
+ // Plan with tables only must not be invalidated on topology change.
assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
- LogicalNode node1 = Mockito.mock(LogicalNode.class);
- Mockito.when(node1.name()).thenReturn("NODE1");
-
- mappingService.onNodeLeft(node1,
- new LogicalTopologySnapshot(3, logicalNodes("NODE")));
-
- // Plan with tables that include left node must be invalidated.
- assertNotSame(tableOnlyMapping, await(mappingService.map(PLAN,
PARAMS)));
-
- verify(targetProvider, times(4)).forTable(any(), any(), any(),
anyBoolean());
- verify(targetProvider, times(2)).forSystemView(any(), any());
+ verify(execProvider, times(3)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
+ verify(execProvider, times(2)).forSystemView(any());
}
@Test
@@ -279,66 +220,101 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
};
// Initialize mapping service.
- ExecutionTargetProvider targetProvider =
Mockito.spy(createTargetProvider(nodeNames));
- MappingServiceImpl mappingService = new MappingServiceImpl(
- localNodeName, CLOCK_SERVICE, targetProvider,
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
- );
+ Supplier<Long> logicalTopologyVerSupplier =
createStableTopologySupplier();
+ ExecutionDistributionProvider execProvider = Mockito.spy(new
TestExecutionDistributionProvider(nodeNames));
- mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
- new LogicalTopologySnapshot(1,
logicalNodes(nodeNames.toArray(new String[0]))));
+ MappingServiceImpl mappingService = Mockito.spy(new MappingServiceImpl(
+ localNodeName,
+ CLOCK_SERVICE,
+ CaffeineCacheFactory.INSTANCE,
+ 100,
+ PARTITION_PRUNER,
+ logicalTopologyVerSupplier,
+ execProvider
+ ));
List<MappedFragment> mappedFragments =
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
- verify(targetProvider, times(1)).forTable(any(), any(), any(),
anyBoolean());
- verify(targetProvider, times(1)).forSystemView(any(), any());
+ verify(execProvider, times(1)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
+ verify(execProvider, times(1)).forSystemView(any());
// Simulate expiration of the primary replica for non-mapped table -
the cache entry should not be invalidated.
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T2")));
assertSame(mappedFragments,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
- verifyNoMoreInteractions(targetProvider);
+
+ verify(mappingService, times(1)).composeDistributions(anySet(),
anySet(), anyBoolean());
// Simulate expiration of the primary replica for mapped table - the
cache entry should be invalidated.
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T1")));
assertNotSame(mappedFragments,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
- verify(targetProvider, times(2)).forTable(any(), any(), any(),
anyBoolean());
- verify(targetProvider, times(2)).forSystemView(any(), any());
+ verify(execProvider, times(2)).forTable(any(HybridTimestamp.class),
any(IgniteTable.class), anyBoolean());
+ verify(execProvider, times(2)).forSystemView(any());
}
- private static List<LogicalNode> logicalNodes(String... nodeNames) {
- return Arrays.stream(nodeNames)
- .map(name -> new LogicalNode(randomUUID(), name,
NetworkAddress.from("127.0.0.1:10000")))
- .collect(Collectors.toList());
+ private MappingServiceImpl createMappingServiceNoCache(String
localNodeName, List<String> nodeNames) {
+ return createMappingService(localNodeName, nodeNames, 0);
}
- private static MappingServiceImpl createMappingServiceNoCache(String
localNodeName, List<String> nodeNames) {
+ private MappingServiceImpl createMappingService(String localNodeName,
List<String> nodeNames, int cacheSize) {
+ Supplier<Long> logicalTopologyVerSupplier =
createChangingTopologySupplier();
+ ExecutionDistributionProvider execProvider = new
TestExecutionDistributionProvider(nodeNames);
+
return new MappingServiceImpl(
localNodeName,
CLOCK_SERVICE,
- createTargetProvider(nodeNames),
- EmptyCacheFactory.INSTANCE,
- 0,
+ CaffeineCacheFactory.INSTANCE,
+ cacheSize,
PARTITION_PRUNER,
- Runnable::run
+ logicalTopologyVerSupplier,
+ execProvider
);
}
- private static ExecutionTargetProvider createTargetProvider(List<String>
nodeNames) {
- return new ExecutionTargetProvider() {
- @Override
- public CompletableFuture<ExecutionTarget> forTable(
- HybridTimestamp operationTime,
- ExecutionTargetFactory factory,
- IgniteTable table,
- boolean includeBackups
- ) {
- return
CompletableFuture.completedFuture(factory.allOf(nodeNames));
- }
+ /** Test distribution provider. */
+ public static class TestExecutionDistributionProvider implements
ExecutionDistributionProvider {
+ private final List<String> nodeNames;
+ private Supplier<RuntimeException> exceptionSupplier = () -> null;
+
+ TestExecutionDistributionProvider(List<String> nodeNames) {
+ this.nodeNames = nodeNames;
+ }
+
+ /** Constructor. */
+ public TestExecutionDistributionProvider(List<String> nodeNames,
Supplier<RuntimeException> exceptionSupplier) {
+ this.nodeNames = nodeNames;
+ this.exceptionSupplier = exceptionSupplier;
+ }
+
+ private static TokenizedAssignments mapAssignment(String peer) {
+ Set<Assignment> peers = Set.of(Assignment.forPeer(peer));
+ return new TokenizedAssignmentsImpl(peers, 1L);
+ }
- @Override
- public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
- return CompletableFuture.completedFuture(view.distribution()
== IgniteDistributions.single()
- ? factory.oneOf(nodeNames)
- : factory.allOf(nodeNames));
+ @Override
+ public CompletableFuture<List<TokenizedAssignments>>
forTable(HybridTimestamp operationTime, IgniteTable table,
+ boolean includeBackups) {
+ if (exceptionSupplier.get() != null) {
+ return CompletableFuture.failedFuture(exceptionSupplier.get());
}
- };
+
+ return CompletableFuture.completedFuture(nodeNames.stream()
+
.map(TestExecutionDistributionProvider::mapAssignment).collect(Collectors.toList()));
+ }
+
+ @Override
+ public List<String> forSystemView(IgniteSystemView view) {
+ return nodeNames;
+ }
+ }
+
+ private static Supplier<Long> createStableTopologySupplier() {
+ return () -> 1L;
+ }
+
+ private Supplier<Long> createTriggeredTopologySupplier() {
+ return () -> topologyChange ? ++topologyVer : topologyVer;
+ }
+
+ private Supplier<Long> createChangingTopologySupplier() {
+ return () -> topologyVer++;
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
index 93bb11bfeb..47a09336f1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -89,14 +89,17 @@ final class MappingTestRunner {
/** Test setup. */
static final class TestSetup {
- private final ExecutionTargetProvider executionTargetProvider;
+ private final ExecutionDistributionProvider
executionDistributionProvider;
private final IgniteSchema schema;
private final LogicalTopologySnapshot topologySnapshot;
- TestSetup(ExecutionTargetProvider executionTargetProvider,
IgniteSchema schema, LogicalTopologySnapshot topologySnapshot) {
- this.executionTargetProvider = executionTargetProvider;
+ TestSetup(
+ ExecutionDistributionProvider executionDistributionProvider,
+ IgniteSchema schema, LogicalTopologySnapshot topologySnapshot
+ ) {
+ this.executionDistributionProvider = executionDistributionProvider;
this.schema = schema;
this.topologySnapshot = topologySnapshot;
}
@@ -138,7 +141,7 @@ final class MappingTestRunner {
Path testFile = location.resolve(fileName);
List<TestCaseDef> testCases = loadTestCases(testFile);
- runTestCases(testFile, setup.schema, setup.executionTargetProvider,
setup.topologySnapshot, parseValidate, testCases);
+ runTestCases(testFile, setup.schema,
setup.executionDistributionProvider, setup.topologySnapshot, parseValidate,
testCases);
}
@TestOnly
@@ -153,7 +156,7 @@ final class MappingTestRunner {
private void runTestCases(Path testFile,
IgniteSchema schema,
- ExecutionTargetProvider targetProvider,
+ ExecutionDistributionProvider executionDistributionProvider,
LogicalTopologySnapshot snapshot,
BiFunction<IgniteSchema, String, IgniteRel> parse,
List<TestCaseDef> testCases) {
@@ -173,7 +176,7 @@ final class MappingTestRunner {
MultiStepPlan multiStepPlan = new MultiStepPlan(new
PlanId(UUID.randomUUID(), 1), sqlQueryType, rel,
resultSetMetadata, parameterMetadata,
schema.catalogVersion(), null);
- String actualText = produceMapping(testDef.nodeName,
targetProvider, snapshot, multiStepPlan);
+ String actualText = produceMapping(testDef.nodeName,
executionDistributionProvider, snapshot, multiStepPlan);
actualResults.add(actualText);
}
@@ -192,7 +195,7 @@ final class MappingTestRunner {
private String produceMapping(
String nodeName,
- ExecutionTargetProvider targetProvider,
+ ExecutionDistributionProvider executionDistributionProvider,
LogicalTopologySnapshot snapshot,
MultiStepPlan plan
) {
@@ -201,13 +204,12 @@ final class MappingTestRunner {
MappingServiceImpl mappingService = new MappingServiceImpl(
nodeName,
new TestClockService(new
TestHybridClock(System::currentTimeMillis)),
- targetProvider,
EmptyCacheFactory.INSTANCE,
0,
partitionPruner,
- Runnable::run
+ snapshot::version,
+ executionDistributionProvider
);
- mappingService.onTopologyLeap(snapshot);
List<MappedFragment> mappedFragments;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
index 025bb3993d..fa9b65fd23 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
@@ -138,7 +138,7 @@ public class MappingTestRunnerSelfTest extends
BaseIgniteAbstractTest {
IllegalStateException err = assertThrows(IllegalStateException.class,
() -> runner.runTest(() -> {
- ExecutionTargetProvider targetProvider =
Mockito.mock(ExecutionTargetProvider.class);
+ ExecutionDistributionProvider targetProvider =
Mockito.mock(ExecutionDistributionProvider.class);
IgniteSchema schema = new IgniteSchema("T", 1, List.of());
LogicalNode node = new LogicalNode(randomUUID(), "N1", new
NetworkAddress("addr", 1000));
LogicalTopologySnapshot topologySnapshot = new
LogicalTopologySnapshot(1, List.of(node));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index b4663de522..0eb50b9a33 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -100,9 +100,7 @@ import
org.apache.ignite.internal.sql.engine.exec.TxAttributes;
import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
+import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionDistributionProvider;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
@@ -709,7 +707,7 @@ public class TestBuilders {
Map<String, TestNode> nodes = nodeNames.stream()
.map(name -> {
var systemViewManager = new
SystemViewManagerImpl(name, catalogManager);
- var targetProvider = new
TestNodeExecutionTargetProvider(
+ var executionProvider = new
TestExecutionDistributionProvider(
systemViewManager::owningNodes,
owningNodesByTableName,
useTablePartitions
@@ -718,17 +716,16 @@ public class TestBuilders {
var mappingService = new MappingServiceImpl(
name,
new TestClockService(clock, clockWaiter),
- targetProvider,
EmptyCacheFactory.INSTANCE,
0,
partitionPruner,
- Runnable::run
+ () -> 1L,
+ executionProvider
);
systemViewManager.register(() -> systemViews);
LogicalTopologySnapshot newTopology = new
LogicalTopologySnapshot(1L, logicalNodes);
- mappingService.onTopologyLeap(newTopology);
systemViewManager.onTopologyLeap(newTopology);
return new TestNode(
@@ -1535,13 +1532,14 @@ public class TestBuilders {
return newRow;
}
- /** Returns a builder for {@link ExecutionTargetProvider}. */
- public static ExecutionTargetProviderBuilder
executionTargetProviderBuilder() {
- return new ExecutionTargetProviderBuilder();
+
+ /** Returns a builder for {@link ExecutionDistributionProvider}. */
+ public static ExecutionDistributionProviderBuilder
executionDistributionProviderBuilder() {
+ return new ExecutionDistributionProviderBuilder();
}
- /** A builder to create instances of {@link ExecutionTargetProvider}. */
- public static final class ExecutionTargetProviderBuilder {
+ /** A builder to create instances of {@link
ExecutionDistributionProvider}. */
+ public static final class ExecutionDistributionProviderBuilder {
private final Map<String, List<List<String>>> owningNodesByTableName =
new HashMap<>();
@@ -1549,12 +1547,12 @@ public class TestBuilders {
private boolean useTablePartitions;
- private ExecutionTargetProviderBuilder() {
+ private ExecutionDistributionProviderBuilder() {
}
/** Adds tables to list of nodes mapping. */
- public ExecutionTargetProviderBuilder addTables(Map<String,
List<List<String>>> tables) {
+ public ExecutionDistributionProviderBuilder addTables(Map<String,
List<List<String>>> tables) {
this.owningNodesByTableName.putAll(tables);
return this;
}
@@ -1563,20 +1561,20 @@ public class TestBuilders {
* Sets a function that returns system views. Function accepts a view
name and returns a list of nodes
* a system view is available at.
*/
- public ExecutionTargetProviderBuilder setSystemViews(Function<String,
List<String>> systemViews) {
+ public ExecutionDistributionProviderBuilder
setSystemViews(Function<String, List<String>> systemViews) {
this.owningNodesBySystemViewName = systemViews;
return this;
}
/** Use table partitions to build mapping targets. Default is {@code
false}. */
- public ExecutionTargetProviderBuilder useTablePartitions(boolean
value) {
+ public ExecutionDistributionProviderBuilder useTablePartitions(boolean
value) {
useTablePartitions = value;
return this;
}
- /** Creates an instance of {@link ExecutionTargetProvider}. */
- public ExecutionTargetProvider build() {
- return new TestNodeExecutionTargetProvider(
+ /** Creates an instance of {@link ExecutionDistributionProvider}. */
+ public ExecutionDistributionProvider build() {
+ return new TestExecutionDistributionProvider(
owningNodesBySystemViewName,
Map.copyOf(owningNodesByTableName),
useTablePartitions
@@ -1584,15 +1582,14 @@ public class TestBuilders {
}
}
- private static class TestNodeExecutionTargetProvider implements
ExecutionTargetProvider {
-
+ private static class TestExecutionDistributionProvider implements
ExecutionDistributionProvider {
final Function<String, List<String>> owningNodesBySystemViewName;
final Map<String, List<List<String>>> owningNodesByTableName;
final boolean useTablePartitions;
- private TestNodeExecutionTargetProvider(
+ private TestExecutionDistributionProvider(
Function<String, List<String>> owningNodesBySystemViewName,
Map<String, List<List<String>>> owningNodesByTableName,
boolean useTablePartitions
@@ -1602,19 +1599,18 @@ public class TestBuilders {
this.useTablePartitions = useTablePartitions;
}
+ private static TokenizedAssignments
partitionNodesToAssignment(List<String> nodes, long token) {
+ return new TokenizedAssignmentsImpl(
+
nodes.stream().map(Assignment::forPeer).collect(Collectors.toSet()),
+ token
+ );
+ }
+
@Override
- public CompletableFuture<ExecutionTarget> forTable(
- HybridTimestamp operationTime,
- ExecutionTargetFactory factory,
- IgniteTable table,
- boolean includeBackups
- ) {
+ public CompletableFuture<List<TokenizedAssignments>>
forTable(HybridTimestamp operationTime, IgniteTable table,
+ boolean includeBackups) {
List<List<String>> owningNodes =
owningNodesByTableName.get(table.name());
- if (nullOrEmpty(owningNodes)) {
- throw new AssertionError("DataProvider is not configured for
table " + table.name());
- }
-
List<TokenizedAssignments> assignments;
if (useTablePartitions) {
@@ -1630,34 +1626,19 @@ public class TestBuilders {
.collect(Collectors.toList());
}
- ExecutionTarget target = factory.partitioned(assignments);
-
- return CompletableFuture.completedFuture(target);
- }
-
- private static TokenizedAssignments
partitionNodesToAssignment(List<String> nodes, long token) {
- return new TokenizedAssignmentsImpl(
-
nodes.stream().map(Assignment::forPeer).collect(Collectors.toSet()),
- token
- );
+ return CompletableFuture.completedFuture(assignments);
}
@Override
- public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+ public List<String> forSystemView(IgniteSystemView view) {
List<String> nodes =
owningNodesBySystemViewName.apply(view.name());
if (nullOrEmpty(nodes)) {
- return CompletableFuture.failedFuture(
- new SqlException(Sql.MAPPING_ERR, format("The view
with name '{}' could not be found on"
- + " any active nodes in the cluster",
view.name()))
- );
+ throw new SqlException(Sql.MAPPING_ERR, format("The view with
name '{}' could not be found on"
+ + " any active nodes in the cluster", view));
}
- return CompletableFuture.completedFuture(
- view.distribution() == IgniteDistributions.single()
- ? factory.oneOf(nodes)
- : factory.allOf(nodes)
- );
+ return view.distribution() == IgniteDistributions.single() ?
List.of(nodes.get(0)) : nodes;
}
}
}