This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a404b155c8 IGNITE-22969 Sql. Gather execution nodes information from 
placement driver (#4629)
a404b155c8 is described below

commit a404b155c8d21d487dc9a235236db515e27d6d50
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Thu Nov 7 10:08:22 2024 +0300

    IGNITE-22969 Sql. Gather execution nodes information from placement driver 
(#4629)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |  10 +-
 ...der.java => ExecutionDistributionProvider.java} |  20 +-
 .../ExecutionDistributionProviderImpl.java}        |  59 +---
 .../sql/engine/exec/mapping/MappingContext.java    |  11 +-
 .../engine/exec/mapping/MappingServiceImpl.java    | 388 +++++++++++----------
 .../mapping/largecluster/LargeClusterFactory.java  |  14 +-
 .../mapping/smallcluster/SmallClusterFactory.java  |  14 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  46 +--
 .../mapping/ExecutionTargetFactorySelfTest.java    |  17 +-
 .../engine/exec/mapping/FragmentMappingTest.java   |   7 +-
 .../exec/mapping/MappingServiceImplTest.java       | 248 ++++++-------
 .../sql/engine/exec/mapping/MappingTestRunner.java |  22 +-
 .../exec/mapping/MappingTestRunnerSelfTest.java    |   2 +-
 .../sql/engine/framework/TestBuilders.java         |  85 ++---
 14 files changed, 435 insertions(+), 508 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index cfca789ce0..0c7f9047e8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -69,6 +69,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
 import org.apache.ignite.internal.sql.engine.exec.fsm.ExecutionPhase;
 import org.apache.ignite.internal.sql.engine.exec.fsm.QueryExecutor;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionDistributionProviderImpl;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
@@ -297,22 +298,21 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
                 view -> () -> systemViewManager.scanView(view.name())
         );
 
-        var executionTargetProvider = new 
ExecutionTargetProviderImpl(placementDriver, systemViewManager);
-
         var partitionPruner = new PartitionPrunerImpl();
 
         var mappingService = new MappingServiceImpl(
                 nodeName,
                 clockService,
-                executionTargetProvider,
                 CACHE_FACTORY,
                 clusterCfg.planner().estimatedNumberOfQueries().value(),
                 partitionPruner,
-                taskExecutor
+                () -> logicalTopologyService.localLogicalTopology().version(),
+                new ExecutionDistributionProviderImpl(placementDriver, 
systemViewManager)
         );
 
-        logicalTopologyService.addEventListener(mappingService);
         placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
mappingService::onPrimaryReplicaExpired);
+        // Need to be implemented after 
https://issues.apache.org/jira/browse/IGNITE-23519 Add an event for lease 
Assignments
+        // placementDriver.listen(PrimaryReplicaEvent.ASSIGNMENTS_CHANGED, 
mappingService::onPrimaryReplicaAssignment);
 
         var executionSrvc = registerService(ExecutionServiceImpl.create(
                 clusterSrvc.topologyService(),
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProvider.java
similarity index 73%
rename from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
rename to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProvider.java
index d131664cf1..2bccadca63 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProvider.java
@@ -17,38 +17,34 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 
-/**
- * An integration point that helps the mapper to acquire an execution target 
of particular
- * relation from fragment.
- */
-public interface ExecutionTargetProvider {
+/** Execution nodes information provider. */
+public interface ExecutionDistributionProvider {
     /**
      * Returns an execution target for a given table.
      *
      * @param operationTime Time of the operation to get consistent results 
among different calls.
-     * @param factory A factory to create target for given table.
      * @param table A table to create execution target for.
      * @param includeBackups Flags denotes whether to include non-primary 
replicas into target.
      * @return A future representing the result.
      */
-    CompletableFuture<ExecutionTarget> forTable(
+    CompletableFuture<List<TokenizedAssignments>> forTable(
             HybridTimestamp operationTime,
-            ExecutionTargetFactory factory,
             IgniteTable table,
             boolean includeBackups
     );
 
     /**
-     * Returns an execution target for a given view.
+     * Returns a distribution for a given view.
      *
-     * @param factory A factory to create target for given table.
      * @param view A view to create execution target for.
-     * @return A future representing the result.
+     * @return Distribution for a given view.
      */
-    CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory 
factory, IgniteSystemView view);
+    List<String> forSystemView(IgniteSystemView view);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
similarity index 79%
rename from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
rename to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
index 86d0650641..82c5b1cc89 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/ExecutionTargetProviderImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
@@ -15,14 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine;
+package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
-import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 
@@ -44,59 +41,39 @@ import 
org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.systemview.api.SystemViewManager;
-import org.apache.ignite.lang.ErrorGroups.Sql;
-import org.apache.ignite.sql.SqlException;
-
-/**
- * Implementation of {@link ExecutionTargetProvider} which takes assignments 
from {@link PlacementDriver} and {@link SystemViewManager}.
- */
-public class ExecutionTargetProviderImpl implements ExecutionTargetProvider {
-    private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionTargetProviderImpl.class);
 
+/** Execution nodes information provider. */
+public class ExecutionDistributionProviderImpl implements 
ExecutionDistributionProvider {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionDistributionProviderImpl.class);
     private final PlacementDriver placementDriver;
     private final SystemViewManager systemViewManager;
 
-    ExecutionTargetProviderImpl(
-            PlacementDriver placementDriver, SystemViewManager 
systemViewManager
-    ) {
+    /**
+     * Constructor.
+     *
+     * @param placementDriver Placement driver.
+     * @param systemViewManager Manager for system views.
+     */
+    public ExecutionDistributionProviderImpl(PlacementDriver placementDriver, 
SystemViewManager systemViewManager) {
         this.placementDriver = placementDriver;
         this.systemViewManager = systemViewManager;
     }
 
     @Override
-    public CompletableFuture<ExecutionTarget> forTable(
+    public List<String> forSystemView(IgniteSystemView view) {
+        return systemViewManager.owningNodes(view.name());
+    }
+
+    @Override
+    public CompletableFuture<List<TokenizedAssignments>> forTable(
             HybridTimestamp operationTime,
-            ExecutionTargetFactory factory,
             IgniteTable table,
             boolean includeBackups
     ) {
-        return collectAssignments(table, operationTime, includeBackups)
-                .thenApply(factory::partitioned);
-    }
-
-    @Override
-    public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
-        List<String> nodes = systemViewManager.owningNodes(view.name());
-
-        if (nullOrEmpty(nodes)) {
-            return failedFuture(
-                    new SqlException(Sql.MAPPING_ERR, format("The view with 
name '{}' could not be found on"
-                            + " any active nodes in the cluster", view.name()))
-            );
-        }
-
-        return completedFuture(
-                view.distribution() == IgniteDistributions.single()
-                        ? factory.oneOf(nodes)
-                        : factory.allOf(nodes)
-        );
+        return collectAssignments(table, operationTime, includeBackups);
     }
 
     // need to be refactored after TODO: 
https://issues.apache.org/jira/browse/IGNITE-20925
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
index b230df95ab..cd7f745465 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingContext.java
@@ -21,7 +21,6 @@ import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.largecluster.LargeClusterFactory;
 import 
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster.SmallClusterFactory;
-import org.apache.ignite.internal.sql.engine.util.Commons;
 
 /**
  * A context that encloses information necessary during mapping.
@@ -29,23 +28,19 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
 class MappingContext {
     private final String localNode;
     private final List<String> nodes;
+    private final RelOptCluster cluster;
 
     private final ExecutionTargetFactory targetFactory;
 
-    private RelOptCluster cluster;
-
-    MappingContext(String localNode, List<String> nodes) {
+    MappingContext(String localNode, List<String> nodes, RelOptCluster 
cluster) {
         this.localNode = localNode;
         this.nodes = nodes;
+        this.cluster = cluster;
 
         this.targetFactory = nodes.size() > 64 ? new 
LargeClusterFactory(nodes) : new SmallClusterFactory(nodes);
     }
 
     public RelOptCluster cluster() {
-        if (cluster == null) {
-            cluster = Commons.cluster();
-        }
-
         return cluster;
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index bcdf364850..34c5cd4f2c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -17,9 +17,13 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
-import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static 
org.apache.ignite.internal.util.CollectionUtils.toIntMapCollector;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntObjectPair;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -32,18 +36,16 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
-import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.hlc.ClockService;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl.LogicalTopologyHolder.TopologySnapshot;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
 import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 import org.apache.ignite.internal.sql.engine.prepare.PlanId;
@@ -51,64 +53,58 @@ import 
org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner;
 import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
 import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.cache.Cache;
 import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
 import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.sql.SqlException;
 
 /**
  * An implementation of {@link MappingService}.
  *
- * <p>This particular implementation keeps track of changes in logical cluster 
topology.
- * Always uses latest topology snapshot to map query.
+ * <p>This particular implementation keeps track of changes according to 
{@link PlacementDriver} assignments.
+ * Use distribution information to map a query.
  */
-public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventListener {
-    private final LogicalTopologyHolder topologyHolder = new 
LogicalTopologyHolder();
-    private final CompletableFuture<Void> initialTopologyFuture = new 
CompletableFuture<>();
-
+public class MappingServiceImpl implements MappingService {
     private final String localNodeName;
     private final ClockService clock;
-    private final ExecutionTargetProvider targetProvider;
     private final Cache<PlanId, FragmentsTemplate> templatesCache;
     private final Cache<MappingsCacheKey, MappingsCacheValue> mappingsCache;
-    private final Executor taskExecutor;
     private final PartitionPruner partitionPruner;
+    private final Supplier<Long> logicalTopologyVerSupplier;
+    private final ExecutionDistributionProvider distributionProvider;
 
     /**
      * Constructor.
      *
      * @param localNodeName Name of the current Ignite node.
      * @param clock Clock service to get actual time.
-     * @param targetProvider Execution target provider.
      * @param cacheFactory A factory to create cache of fragments.
      * @param cacheSize Size of the cache of query plans. Should be non 
negative.
      * @param partitionPruner Partition pruner.
-     * @param taskExecutor Mapper service task executor.
+     * @param logicalTopologyVerSupplier Logical topology version supplier.
+     * @param distributionProvider Execution distribution provider.
      */
     public MappingServiceImpl(
             String localNodeName,
             ClockService clock,
-            ExecutionTargetProvider targetProvider,
             CacheFactory cacheFactory,
             int cacheSize,
             PartitionPruner partitionPruner,
-            Executor taskExecutor
+            Supplier<Long> logicalTopologyVerSupplier,
+            ExecutionDistributionProvider distributionProvider
     ) {
         this.localNodeName = localNodeName;
         this.clock = clock;
-        this.targetProvider = targetProvider;
         this.templatesCache = cacheFactory.create(cacheSize);
         this.mappingsCache = cacheFactory.create(cacheSize);
-        this.taskExecutor = taskExecutor;
         this.partitionPruner = partitionPruner;
-    }
-
-    @Override
-    public CompletableFuture<List<MappedFragment>> map(MultiStepPlan 
multiStepPlan, MappingParameters parameters) {
-        if (initialTopologyFuture.isDone()) {
-            return map0(multiStepPlan, parameters);
-        }
-
-        return initialTopologyFuture.thenComposeAsync(ignore -> 
map0(multiStepPlan, parameters), taskExecutor);
+        this.logicalTopologyVerSupplier = logicalTopologyVerSupplier;
+        this.distributionProvider = distributionProvider;
     }
 
     /** Called when the primary replica has expired. */
@@ -124,13 +120,12 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
         return CompletableFutures.falseCompletedFuture();
     }
 
-    private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan 
multiStepPlan, MappingParameters parameters) {
-        TopologySnapshot topology = topologyHolder.topology();
-        MappingContext context = new MappingContext(localNodeName, 
topology.nodes());
-
-        FragmentsTemplate template = getOrCreateTemplate(multiStepPlan, 
context);
+    @Override
+    public CompletableFuture<List<MappedFragment>> map(MultiStepPlan 
multiStepPlan, MappingParameters parameters) {
+        FragmentsTemplate template = getOrCreateTemplate(multiStepPlan);
 
         boolean mapOnBackups = parameters.mapOnBackups();
+
         MappingsCacheValue cacheValue = mappingsCache.compute(
                 new MappingsCacheKey(multiStepPlan.id(), mapOnBackups),
                 (key, val) -> {
@@ -145,203 +140,213 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
                             }
                         }
 
-                        long topVer = topologyAware ? topology.version() : 
Long.MAX_VALUE;
+                        long topVer = topologyAware ? 
logicalTopologyVerSupplier.get() : Long.MAX_VALUE;
 
-                        return new MappingsCacheValue(topVer, tableIds, 
mapFragments(context, template, key.mapOnBackups));
+                        return new MappingsCacheValue(topVer, tableIds, 
mapFragments(template, mapOnBackups));
                     }
 
-                    if (val.topVer < topology.version()) {
-                        return new MappingsCacheValue(topology.version(), 
val.tableIds, mapFragments(context, template, key.mapOnBackups));
+                    long topologyVer = logicalTopologyVerSupplier.get();
+
+                    if (val.topologyVersion < topologyVer) {
+                        return new MappingsCacheValue(topologyVer, 
val.tableIds, mapFragments(template, mapOnBackups));
                     }
 
                     return val;
-                }
-        );
+                });
 
-        return cacheValue.mappedFragments.thenApply(mappedFragments -> 
applyPartitionPruning(mappedFragments.fragments, parameters));
+        return cacheValue.mappedFragments.thenApply(frags -> 
applyPartitionPruning(frags.fragments, parameters));
     }
 
-    private CompletableFuture<MappedFragments> mapFragments(
-            MappingContext context,
-            FragmentsTemplate template,
+    CompletableFuture<DistributionHolder> composeDistributions(
+            Set<IgniteSystemView> views,
+            Set<IgniteTable> tables,
             boolean mapOnBackups
     ) {
-        IdGenerator idGenerator = new IdGenerator(template.nextId);
-        List<Fragment> fragments = new ArrayList<>(template.fragments);
-        HybridTimestamp mappingTime = clock.now();
-
-        List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets =
-                fragments.stream().flatMap(fragment -> Stream.concat(
-                        fragment.tables().values().stream()
-                                .map(table -> 
targetProvider.forTable(mappingTime, context.targetFactory(), table, 
mapOnBackups)
-                                        .thenApply(target -> 
IntObjectPair.of(table.id(), target))
-                                ),
-                        fragment.systemViews().stream()
-                                .map(view -> 
targetProvider.forSystemView(context.targetFactory(), view)
-                                        .thenApply(target -> 
IntObjectPair.of(view.id(), target))
-                                )
-                ))
-                .collect(Collectors.toList());
-
-        return allOf(targets.toArray(new CompletableFuture[0]))
-                .thenApply(ignored -> {
-                    Int2ObjectMap<ExecutionTarget> targetsById = new 
Int2ObjectOpenHashMap<>();
-
-                    for (CompletableFuture<IntObjectPair<ExecutionTarget>> fut 
: targets) {
-                        // this is a safe join, because we have waited for all 
futures to be complete
-                        IntObjectPair<ExecutionTarget> pair = fut.join();
-
-                        targetsById.put(pair.firstInt(), pair.second());
-                    }
+        if (tables.isEmpty() && views.isEmpty()) {
+            DistributionHolder holder = new 
DistributionHolder(Set.of(localNodeName), Int2ObjectMaps.emptyMap(), 
Int2ObjectMaps.emptyMap());
 
-                    FragmentMapper mapper = new 
FragmentMapper(template.cluster.getMetadataQuery(), context, targetsById);
+            return completedFuture(holder);
+        } else {
+            Int2ObjectMap<CompletableFuture<List<TokenizedAssignments>>> 
tablesAssignments = new Int2ObjectOpenHashMap<>(tables.size());
+            Set<String> allNodes = new HashSet<>();
 
-                    List<FragmentMapping> mappings = mapper.map(fragments, 
idGenerator);
+            allNodes.add(localNodeName);
 
-                    Long2ObjectMap<ColocationGroup> groupsBySourceId = new 
Long2ObjectOpenHashMap<>();
-                    Long2ObjectMap<List<String>> allSourcesByExchangeId = new 
Long2ObjectOpenHashMap<>();
+            for (IgniteTable tbl : tables) {
+                CompletableFuture<List<TokenizedAssignments>> assignments = 
distributionProvider
+                        .forTable(clock.now(), tbl, mapOnBackups);
 
-                    for (FragmentMapping mapping : mappings) {
-                        Fragment fragment = mapping.fragment();
+                tablesAssignments.put(tbl.id(), assignments);
+            }
 
-                        for (ColocationGroup group : mapping.groups()) {
-                            for (long sourceId : group.sourceIds()) {
-                                groupsBySourceId.put(sourceId, group);
-                            }
-                        }
+            return 
CompletableFuture.allOf(tablesAssignments.values().toArray(new 
CompletableFuture[0]))
+                    .thenApply(ignore -> {
+                        Int2ObjectMap<List<TokenizedAssignments>> 
assignmentsPerTable = new Int2ObjectOpenHashMap<>(tables.size());
 
-                        if (!fragment.rootFragment()) {
-                            IgniteSender sender = (IgniteSender) 
fragment.root();
+                        tablesAssignments.keySet().forEach(k -> {
+                            // this is a safe join, because we have waited for 
all futures to be completed
+                            List<TokenizedAssignments> assignments = 
tablesAssignments.get(k).join();
 
-                            List<String> nodeNames = mapping.groups().stream()
-                                    .flatMap(g -> g.nodeNames().stream())
-                                    .distinct().collect(Collectors.toList());
+                            assignments.stream().flatMap(i -> 
i.nodes().stream()).map(Assignment::consistentId).forEach(allNodes::add);
 
-                            allSourcesByExchangeId.put(sender.exchangeId(), 
nodeNames);
-                        }
-                    }
+                            assignmentsPerTable.put(k, assignments);
+                        });
 
-                    List<MappedFragment> mappedFragmentsList = new 
ArrayList<>(mappings.size());
-                    Set<String> targetNodes = new HashSet<>();
-                    for (FragmentMapping mapping : mappings) {
-                        Fragment fragment = mapping.fragment();
+                        return assignmentsPerTable;
+                    })
+                    .thenApply(assignmentsPerTable -> {
+                        Int2ObjectMap<List<String>> nodesPerView = 
views.stream()
+                                
.collect(toIntMapCollector(IgniteDataSource::id, 
distributionProvider::forSystemView));
 
-                        ColocationGroup targetGroup = null;
-                        if (!fragment.rootFragment()) {
-                            IgniteSender sender = (IgniteSender) 
fragment.root();
+                        
nodesPerView.values().stream().flatMap(List::stream).forEach(allNodes::add);
 
-                            targetGroup = 
groupsBySourceId.get(sender.exchangeId());
-                        }
+                        return new DistributionHolder(allNodes, 
assignmentsPerTable, nodesPerView);
+                    });
+        }
+    }
 
-                        Long2ObjectMap<List<String>> sourcesByExchangeId = 
null;
-                        for (IgniteReceiver receiver : fragment.remotes()) {
-                            if (sourcesByExchangeId == null) {
-                                sourcesByExchangeId = new 
Long2ObjectOpenHashMap<>();
-                            }
+    private CompletableFuture<MappedFragments> mapFragments(
+            FragmentsTemplate template,
+            boolean mapOnBackups
+    ) {
+        Set<IgniteSystemView> views = 
template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream())
+                .collect(Collectors.toSet());
 
-                            long exchangeId = receiver.exchangeId();
+        Set<IgniteTable> tables = template.fragments.stream().flatMap(fragment 
-> fragment.tables().values().stream())
+                .collect(Collectors.toSet());
 
-                            sourcesByExchangeId.put(exchangeId, 
allSourcesByExchangeId.get(exchangeId));
-                        }
+        CompletableFuture<DistributionHolder> res = 
composeDistributions(views, tables, mapOnBackups);
 
-                        MappedFragment mappedFragment = new MappedFragment(
-                                fragment,
-                                mapping.groups(),
-                                sourcesByExchangeId,
-                                targetGroup,
-                                null
-                        );
+        return res.thenApply(assignments -> {
+            Int2ObjectMap<ExecutionTarget> targetsById = new 
Int2ObjectOpenHashMap<>();
 
-                        mappedFragmentsList.add(mappedFragment);
+            MappingContext context = new MappingContext(localNodeName, new 
ArrayList<>(assignments.nodes()), template.cluster);
 
-                        targetNodes.addAll(mappedFragment.nodes());
-                    }
+            ExecutionTargetFactory targetFactory = context.targetFactory();
 
-                    return new MappedFragments(mappedFragmentsList, 
targetNodes);
-                });
-    }
+            List<IntObjectPair<ExecutionTarget>> allTargets = 
prepareTargets(template, assignments, targetFactory);
 
-    @Override
-    public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot 
newTopology) {
-        topologyHolder.update(newTopology);
-    }
+            for (IntObjectPair<ExecutionTarget> pair : allTargets) {
+                targetsById.put(pair.firstInt(), pair.second());
+            }
 
-    @Override
-    public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
-        topologyHolder.update(newTopology);
+            FragmentMapper mapper = new 
FragmentMapper(context.cluster().getMetadataQuery(), context, targetsById);
 
-        mappingsCache.removeIfValue(value ->
-                !value.mappedFragments.isDone() // Invalidate non-completed 
mappings to reduce the chance of getting stale value 
-                        || 
value.mappedFragments.join().nodes.contains(leftNode.name()));
-    }
+            IdGenerator idGenerator = new IdGenerator(template.nextId);
 
-    @Override
-    public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
-        topologyHolder.update(newTopology);
-    }
+            List<Fragment> fragments = new ArrayList<>(template.fragments);
 
-    private List<MappedFragment> applyPartitionPruning(List<MappedFragment> 
mappedFragments, MappingParameters parameters) {
-        return partitionPruner.apply(mappedFragments, 
parameters.dynamicParameters());
-    }
+            List<FragmentMapping> mappings = mapper.map(fragments, 
idGenerator);
 
-    /**
-     * Holder for topology snapshots that guarantees monotonically increasing 
versions.
-     */
-    class LogicalTopologyHolder {
-        private volatile TopologySnapshot topology = new 
TopologySnapshot(Long.MIN_VALUE, List.of());
+            Long2ObjectMap<ColocationGroup> groupsBySourceId = new 
Long2ObjectOpenHashMap<>();
+            Long2ObjectMap<List<String>> allSourcesByExchangeId = new 
Long2ObjectOpenHashMap<>();
+
+            for (FragmentMapping mapping : mappings) {
+                Fragment fragment = mapping.fragment();
 
-        void update(LogicalTopologySnapshot topologySnapshot) {
-            synchronized (this) {
-                if (topology.version() < topologySnapshot.version()) {
-                    topology = new 
TopologySnapshot(topologySnapshot.version(), deriveNodeNames(topologySnapshot));
+                for (ColocationGroup group : mapping.groups()) {
+                    for (long sourceId : group.sourceIds()) {
+                        groupsBySourceId.put(sourceId, group);
+                    }
                 }
 
-                if (initialTopologyFuture.isDone() || 
!topology.nodes().contains(localNodeName)) {
-                    return;
+                if (!fragment.rootFragment()) {
+                    IgniteSender sender = (IgniteSender) fragment.root();
+
+                    List<String> nodeNames = mapping.groups().stream()
+                            .flatMap(g -> g.nodeNames().stream())
+                            .distinct().collect(Collectors.toList());
+
+                    allSourcesByExchangeId.put(sender.exchangeId(), nodeNames);
                 }
             }
 
-            initialTopologyFuture.complete(null);
-        }
+            List<MappedFragment> mappedFragmentsList = new 
ArrayList<>(mappings.size());
+            Set<String> targetNodes = new HashSet<>();
+            for (FragmentMapping mapping : mappings) {
+                Fragment fragment = mapping.fragment();
 
-        TopologySnapshot topology() {
-            return topology;
-        }
+                ColocationGroup targetGroup = null;
+                if (!fragment.rootFragment()) {
+                    IgniteSender sender = (IgniteSender) fragment.root();
 
-        private List<String> deriveNodeNames(LogicalTopologySnapshot topology) 
{
-            return topology.nodes().stream()
-                    .map(LogicalNode::name)
-                    .collect(Collectors.toUnmodifiableList());
-        }
+                    targetGroup = groupsBySourceId.get(sender.exchangeId());
+                }
 
-        class TopologySnapshot {
-            private final List<String> nodes;
-            private final long version;
+                Long2ObjectMap<List<String>> sourcesByExchangeId = null;
+                for (IgniteReceiver receiver : fragment.remotes()) {
+                    if (sourcesByExchangeId == null) {
+                        sourcesByExchangeId = new Long2ObjectOpenHashMap<>();
+                    }
 
-            TopologySnapshot(long version, List<String> nodes) {
-                this.version = version;
-                this.nodes = nodes;
-            }
+                    long exchangeId = receiver.exchangeId();
 
-            public List<String> nodes() {
-                return nodes;
-            }
+                    sourcesByExchangeId.put(exchangeId, 
allSourcesByExchangeId.get(exchangeId));
+                }
+
+                MappedFragment mappedFragment = new MappedFragment(
+                        fragment,
+                        mapping.groups(),
+                        sourcesByExchangeId,
+                        targetGroup,
+                        null
+                );
 
-            public long version() {
-                return version;
+                mappedFragmentsList.add(mappedFragment);
+
+                targetNodes.addAll(mappedFragment.nodes());
             }
+
+            return new MappedFragments(mappedFragmentsList, targetNodes);
+        });
+    }
+
+    private static List<IntObjectPair<ExecutionTarget>> prepareTargets(
+            FragmentsTemplate template,
+            DistributionHolder distr,
+            ExecutionTargetFactory targetFactory
+    ) {
+        Stream<IntObjectPair<ExecutionTarget>> tableTargets = 
template.fragments.stream().flatMap(fragment ->
+                fragment.tables().values().stream()
+                        .map(table -> IntObjectPair.of(table.id(),
+                                buildTargetforTable(targetFactory, 
distr.tableAssignments(table.id())))));
+
+        Stream<IntObjectPair<ExecutionTarget>> viewTargets = 
template.fragments.stream().flatMap(fragment -> fragment.systemViews().stream()
+                .map(view -> IntObjectPair.of(view.id(), 
buildTargetForSystemView(targetFactory, view, distr.viewNodes(view.id())))));
+
+        return Stream.concat(tableTargets, 
viewTargets).collect(Collectors.toList());
+    }
+
+    private static ExecutionTarget 
buildTargetForSystemView(ExecutionTargetFactory factory, IgniteSystemView view, 
List<String> nodes) {
+        if (nullOrEmpty(nodes)) {
+            throw new SqlException(Sql.MAPPING_ERR, format("The view with name 
'{}' could not be found on"
+                    + " any active nodes in the cluster", view.name()));
         }
+
+        return view.distribution() == IgniteDistributions.single()
+                ? factory.oneOf(nodes)
+                : factory.allOf(nodes);
+    }
+
+    private static ExecutionTarget buildTargetforTable(ExecutionTargetFactory 
factory, List<TokenizedAssignments> assignments) {
+        return factory.partitioned(assignments);
     }
 
-    private FragmentsTemplate getOrCreateTemplate(MultiStepPlan plan, 
MappingContext context) {
+    private List<MappedFragment> applyPartitionPruning(List<MappedFragment> 
mappedFragments, MappingParameters parameters) {
+        return partitionPruner.apply(mappedFragments, 
parameters.dynamicParameters());
+    }
+
+    private FragmentsTemplate getOrCreateTemplate(MultiStepPlan plan) {
         // QuerySplitter is deterministic, thus we can cache result in order 
to reuse it next time
         return templatesCache.get(plan.id(), key -> {
             IdGenerator idGenerator = new IdGenerator(0);
 
-            List<Fragment> fragments = new QuerySplitter(idGenerator, 
context.cluster()).split(plan.root());
+            RelOptCluster cluster = Commons.cluster();
+
+            List<Fragment> fragments = new QuerySplitter(idGenerator, 
cluster).split(plan.root());
 
             return new FragmentsTemplate(
-                    idGenerator.nextId(), context.cluster(), fragments
+                    idGenerator.nextId(), cluster, fragments
             );
         });
     }
@@ -370,12 +375,12 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
     }
 
     private static class MappingsCacheValue {
-        private final long topVer;
+        private final long topologyVersion;
         private final IntSet tableIds;
         private final CompletableFuture<MappedFragments> mappedFragments;
 
-        MappingsCacheValue(long topVer, IntSet tableIds, 
CompletableFuture<MappedFragments> mappedFragments) {
-            this.topVer = topVer;
+        MappingsCacheValue(long topologyVersion, IntSet tableIds, 
CompletableFuture<MappedFragments> mappedFragments) {
+            this.topologyVersion = topologyVersion;
             this.tableIds = tableIds;
             this.mappedFragments = mappedFragments;
         }
@@ -409,4 +414,31 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
             return Objects.hash(planId, mapOnBackups);
         }
     }
+
+    private static class DistributionHolder {
+        private final Set<String> nodes;
+        private final Int2ObjectMap<List<TokenizedAssignments>> 
assignmentsPerTable;
+        private final Int2ObjectMap<List<String>> nodesPerView;
+
+        DistributionHolder(
+                Set<String> nodes,
+                Int2ObjectMap<List<TokenizedAssignments>> assignmentsPerTable,
+                Int2ObjectMap<List<String>> nodesPerView) {
+            this.nodes = nodes;
+            this.assignmentsPerTable = assignmentsPerTable;
+            this.nodesPerView = nodesPerView;
+        }
+
+        Set<String> nodes() {
+            return nodes;
+        }
+
+        List<TokenizedAssignments> tableAssignments(int tableId) {
+            return assignmentsPerTable.get(tableId);
+        }
+
+        List<String> viewNodes(int viewId) {
+            return nodesPerView.get(viewId);
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
index 5453f2ab7d..d5d5be9877 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/largecluster/LargeClusterFactory.java
@@ -85,10 +85,9 @@ public class LargeClusterFactory implements 
ExecutionTargetFactory {
             for (Assignment a : assignment.nodes()) {
                 int node = nodeNameToId.getOrDefault(a.consistentId(), -1);
 
-                // TODO Ignore unknown node until IGNITE-22969
-                if (node != -1) {
-                    nodes.set(node);
-                }
+                assert node >= 0 : "invalid node";
+
+                nodes.set(node);
             }
 
             assert !nodes.isEmpty() : "No partition node found";
@@ -127,10 +126,9 @@ public class LargeClusterFactory implements 
ExecutionTargetFactory {
         for (String name : nodes) {
             int id = nodeNameToId.getOrDefault(name, -1);
 
-            // TODO Ignore unknown node until IGNITE-22969
-            if (id != -1) {
-                nodesSet.set(id);
-            }
+            assert id >= 0 : "invalid node";
+
+            nodesSet.set(id);
         }
 
         return nodesSet;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index 66a173828f..fde0fe0fac 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -89,10 +89,9 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
             for (Assignment a : assignment.nodes()) {
                 long node = nodeNameToId.getOrDefault(a.consistentId(), -1);
 
-                // TODO Ignore unknown node until IGNITE-22969
-                if (node != -1) {
-                    currentPartitionNodes |= node;
-                }
+                assert node >= 0 : "invalid node";
+
+                currentPartitionNodes |= node;
             }
 
             assert currentPartitionNodes != 0L : "No partition node found";
@@ -132,10 +131,9 @@ public class SmallClusterFactory implements 
ExecutionTargetFactory {
         for (String name : nodes) {
             long node = nodeNameToId.getOrDefault(name, -1);
 
-            // TODO Ignore unknown node until IGNITE-22969
-            if (node != -1) {
-                nodesMap |= node;
-            }
+            assert node >= 0 : "invalid node";
+
+            nodesMap |= node;
         }
 
         return nodesMap;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 75f8516704..2e05f72ddd 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.exec;
 
 import static java.util.UUID.randomUUID;
 import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
@@ -78,14 +77,11 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.catalog.CatalogCommand;
-import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.RunnableX;
@@ -106,10 +102,8 @@ import 
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestC
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImplTest.TestExecutionDistributionProvider;
 import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
 import org.apache.ignite.internal.sql.engine.exec.rel.Node;
@@ -138,7 +132,6 @@ import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverte
 import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
-import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserService;
@@ -1139,15 +1132,9 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         ExecutionDependencyResolver dependencyResolver = new 
ExecutionDependencyResolverImpl(executableTableRegistry, null);
 
-        var mappingService = createMappingService(nodeName, clockService, 
taskExecutor, mappingCacheFactory);
+        var mappingService = createMappingService(nodeName, clockService, 
mappingCacheFactory, nodeNames);
         var tableFunctionRegistry = new TableFunctionRegistryImpl();
 
-        List<LogicalNode> logicalNodes = nodeNames.stream()
-                .map(name -> new LogicalNode(randomUUID(), name, 
NetworkAddress.from("127.0.0.1:10000")))
-                .collect(Collectors.toList());
-
-        mappingService.onTopologyLeap(new LogicalTopologySnapshot(1, 
logicalNodes));
-
         var executionService = new ExecutionServiceImpl<>(
                 messageService,
                 topologyService,
@@ -1173,32 +1160,15 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
     private MappingServiceImpl createMappingService(
             String nodeName,
             ClockService clock,
-            QueryTaskExecutor taskExecutor,
-            CacheFactory cacheFactory
+            CacheFactory cacheFactory,
+            List<String> logicalNodes
     ) {
-        var targetProvider = new ExecutionTargetProvider() {
-            @Override
-            public CompletableFuture<ExecutionTarget> forTable(
-                    HybridTimestamp operationTime,
-                    ExecutionTargetFactory factory,
-                    IgniteTable table,
-                    boolean includeBackups
-            ) {
-                if (mappingException != null) {
-                    return CompletableFuture.failedFuture(mappingException);
-                }
-
-                return completedFuture(factory.allOf(nodeNames));
-            }
+        PartitionPruner partitionPruner = (mappedFragments, dynamicParameters) 
-> mappedFragments;
 
-            @Override
-            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
-                return CompletableFuture.failedFuture(new AssertionError("Not 
supported"));
-            }
-        };
+        Supplier<Long> topologyVerSupplier = () -> Long.MAX_VALUE;
 
-        PartitionPruner partitionPruner = (mappedFragments, dynamicParameters) 
-> mappedFragments;
-        return new MappingServiceImpl(nodeName, clock, targetProvider, 
cacheFactory, 0, partitionPruner, taskExecutor);
+        return new MappingServiceImpl(nodeName, clock, cacheFactory, 0, 
partitionPruner, topologyVerSupplier,
+                new TestExecutionDistributionProvider(logicalNodes, () -> 
mappingException));
     }
 
     private SqlOperationContext createContext() {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
index 5c82615c76..5fc233d0be 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactorySelfTest.java
@@ -95,14 +95,15 @@ public class ExecutionTargetFactorySelfTest {
         List<String> partiallyInvalidNodeSet = 
CollectionUtils.concat(SINGLE_NODE_SET, invalidNodeSet);
 
         assertThrows(AssertionError.class, () -> f.allOf(invalidNodeSet), 
"invalid node");
-        assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet), 
"Empty target is not allowed");
-        assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet), 
"Empty target is not allowed");
-        assertThrows(AssertionError.class, () -> 
f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "No partition node 
found");
-
-        assertThrows(Throwable.class, () -> f.allOf(partiallyInvalidNodeSet), 
"invalid node");
-        assertThat(f.resolveNodes(f.someOf(partiallyInvalidNodeSet)), 
equalTo(SINGLE_NODE_SET));
-        assertThat(f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)), 
equalTo(SINGLE_NODE_SET));
-        
assertThat(f.resolveNodes(f.partitioned(assignment(partiallyInvalidNodeSet, 
partiallyInvalidNodeSet))), equalTo(SINGLE_NODE_SET));
+        assertThrows(AssertionError.class, () -> f.someOf(invalidNodeSet), 
"invalid node");
+        assertThrows(AssertionError.class, () -> f.oneOf(invalidNodeSet), 
"invalid node");
+        assertThrows(AssertionError.class, () -> 
f.partitioned(assignmentFromPrimaries(invalidNodeSet)), "invalid node");
+
+        assertThrows(AssertionError.class, () -> 
f.allOf(partiallyInvalidNodeSet), "invalid node");
+        assertThrows(AssertionError.class, () -> 
f.resolveNodes(f.someOf(partiallyInvalidNodeSet)), "invalid node");
+        assertThrows(AssertionError.class, () -> 
f.resolveNodes(f.oneOf(partiallyInvalidNodeSet)), "invalid node");
+        assertThrows(AssertionError.class, () -> f.resolveNodes(
+                f.partitioned(assignment(partiallyInvalidNodeSet, 
partiallyInvalidNodeSet))), "invalid node");
     }
 
     @ParameterizedTest
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index 99481eaf5b..45629d47e3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -413,14 +413,15 @@ public class FragmentMappingTest extends 
AbstractPlannerTest {
             objectId += 1;
         }
 
+        LogicalTopologySnapshot logicalTopologySnapshot = newLogicalTopology();
+
         IgniteSchema schema = new IgniteSchema(SqlCommon.DEFAULT_SCHEMA_NAME, 
1, dataSources);
-        ExecutionTargetProvider executionTargetProvider = 
TestBuilders.executionTargetProviderBuilder()
+        ExecutionDistributionProvider executionDistributionProvider = 
TestBuilders.executionDistributionProviderBuilder()
                 .useTablePartitions(true)
                 .addTables(table2Assignments)
                 .build();
-        LogicalTopologySnapshot logicalTopologySnapshot = newLogicalTopology();
 
-        return new TestSetup(executionTargetProvider, schema, 
logicalTopologySnapshot);
+        return new TestSetup(executionDistributionProvider, schema, 
logicalTopologySnapshot);
     }
 
     private void validateAssignments(String tableName, List<List<String>> 
assignments) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index d27450f94e..fef3e7fa88 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -17,39 +17,38 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
-import static java.util.UUID.randomUUID;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.TestHybridClock;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
-import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
+import 
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -58,14 +57,11 @@ import 
org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
-import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.systemview.api.SystemViews;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.SubscriptionUtils;
-import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
@@ -80,6 +76,8 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
     private static final ClockService CLOCK_SERVICE = new TestClockService(new 
TestHybridClock(System::currentTimeMillis));
     private static final MappingParameters PARAMS = MappingParameters.EMPTY;
     private static final PartitionPruner PARTITION_PRUNER = (fragments, 
dynParams) -> fragments;
+    private long topologyVer;
+    private boolean topologyChange;
 
     static {
         // @formatter:off
@@ -124,18 +122,23 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         List<String> nodeNames = List.of(localNodeName, "NODE1");
 
         // Initialize mapping service.
-        ExecutionTargetProvider targetProvider = 
Mockito.spy(createTargetProvider(nodeNames));
-        MappingServiceImpl mappingService = new MappingServiceImpl(
-                localNodeName, CLOCK_SERVICE, targetProvider, 
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
-        );
-        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
-                new LogicalTopologySnapshot(1, 
logicalNodes(nodeNames.toArray(new String[0]))));
+        Supplier<Long> logicalTopologyVerSupplier = 
createChangingTopologySupplier();
+        TestExecutionDistributionProvider execProvider = Mockito.spy(new 
TestExecutionDistributionProvider(nodeNames));
 
+        MappingServiceImpl mappingService = Mockito.spy(new MappingServiceImpl(
+                localNodeName,
+                CLOCK_SERVICE,
+                CaffeineCacheFactory.INSTANCE,
+                100,
+                PARTITION_PRUNER,
+                logicalTopologyVerSupplier,
+                execProvider
+        ));
 
         List<MappedFragment> defaultMapping = await(mappingService.map(PLAN, 
PARAMS));
         List<MappedFragment> mappingOnBackups = await(mappingService.map(PLAN, 
MappingParameters.MAP_ON_BACKUPS));
 
-        verify(targetProvider, times(2)).forTable(any(), any(), any(), 
anyBoolean());
+        verify(execProvider, times(2)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
 
         assertSame(defaultMapping, await(mappingService.map(PLAN, PARAMS)));
         assertSame(mappingOnBackups, await(mappingService.map(PLAN, 
MappingParameters.MAP_ON_BACKUPS)));
@@ -147,115 +150,53 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         String localNodeName = "NODE0";
 
         MappingServiceImpl mappingService = 
createMappingServiceNoCache(localNodeName, List.of(localNodeName));
-        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class), new 
LogicalTopologySnapshot(1, logicalNodes(localNodeName)));
 
         CompletableFuture<List<MappedFragment>> mappingFuture = 
mappingService.map(PLAN, PARAMS);
 
         assertThat(mappingFuture, willSucceedFast());
     }
 
-    @Test
-    public void lateServiceInitializationOnTopologyLeap() {
-        String localNodeName = "NODE";
-        List<String> nodeNames = List.of("NODE1");
-
-        MappingServiceImpl mappingService = 
createMappingServiceNoCache(localNodeName, nodeNames);
-
-        CompletableFuture<List<MappedFragment>> mappingFuture = 
mappingService.map(PLAN, PARAMS);
-
-        // Mapping should wait for service initialization.
-        assertFalse(mappingFuture.isDone());
-
-        // Join another node affect nothing.
-        mappingService.onTopologyLeap(new LogicalTopologySnapshot(1, 
logicalNodes("NODE1", "NODE2")));
-        assertThat(mappingFuture, willThrowFast(TimeoutException.class));
-
-        // Joining local node completes initialization.
-        mappingService.onTopologyLeap(new LogicalTopologySnapshot(2, 
logicalNodes("NODE", "NODE1", "NODE2")));
-
-        assertThat(mappingFuture, willSucceedFast());
-        assertThat(mappingService.map(PLAN, PARAMS), willSucceedFast());
-    }
-
-    @Test
-    public void lateServiceInitializationOnNodeJoin() {
-        String localNodeName = "NODE";
-        List<String> nodeNames = List.of("NODE1");
-
-        MappingServiceImpl mappingService = 
createMappingServiceNoCache(localNodeName, nodeNames);
-
-        CompletableFuture<List<MappedFragment>> mappingFuture = 
mappingService.map(PLAN, PARAMS);
-
-        // Mapping should wait for service initialization.
-        assertFalse(mappingFuture.isDone());
-
-        // Join another node affect nothing.
-        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
-                new LogicalTopologySnapshot(1, logicalNodes("NODE1", 
"NODE2")));
-
-        assertThat(mappingFuture, willThrowFast(TimeoutException.class));
-
-        // Joining local node completes initialization.
-        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
-                new LogicalTopologySnapshot(2, logicalNodes("NODE", "NODE1", 
"NODE2")));
-
-        assertThat(mappingFuture, willSucceedFast());
-        assertThat(mappingService.map(PLAN, PARAMS), willSucceedFast());
-    }
-
     @Test
     public void testCacheInvalidationOnTopologyChange() {
         String localNodeName = "NODE";
         List<String> nodeNames = List.of(localNodeName, "NODE1");
 
-        // Initialize mapping service.
-        ExecutionTargetProvider targetProvider = 
Mockito.spy(createTargetProvider(nodeNames));
-        MappingServiceImpl mappingService = new MappingServiceImpl(
-                localNodeName, CLOCK_SERVICE, targetProvider, 
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
-        );
+        Supplier<Long> logicalTopologyVerSupplier = 
createTriggeredTopologySupplier();
+        TestExecutionDistributionProvider execProvider = Mockito.spy(new 
TestExecutionDistributionProvider(nodeNames));
 
-        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
-                new LogicalTopologySnapshot(1, 
logicalNodes(nodeNames.toArray(new String[0]))));
+        MappingServiceImpl mappingService = Mockito.spy(new MappingServiceImpl(
+                localNodeName,
+                CLOCK_SERVICE,
+                CaffeineCacheFactory.INSTANCE,
+                100,
+                PARTITION_PRUNER,
+                logicalTopologyVerSupplier,
+                execProvider
+        ));
 
         List<MappedFragment> tableOnlyMapping = await(mappingService.map(PLAN, 
PARAMS));
         List<MappedFragment> sysViewMapping = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
 
-        verify(targetProvider, times(2)).forTable(any(), any(), any(), 
anyBoolean());
-        verify(targetProvider, times(1)).forSystemView(any(), any());
+        verify(execProvider, times(2)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
+        verify(execProvider, times(1)).forSystemView(any());
+
+        verify(mappingService, times(2)).composeDistributions(anySet(), 
anySet(), anyBoolean());
 
         assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
         assertSame(sysViewMapping, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
 
-        LogicalNode newNode = Mockito.mock(LogicalNode.class);
-        Mockito.when(newNode.name()).thenReturn("NODE2");
+        verifyNoMoreInteractions(execProvider);
 
-        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
-                new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1", 
"NODE2")));
-
-        // Plan with tables only must not be invalidated on node join.
-        assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
-        verifyNoMoreInteractions(targetProvider);
+        topologyChange = true;
 
         // Plan with system views must be invalidated.
         assertNotSame(sysViewMapping, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
 
-        mappingService.onNodeLeft(newNode,
-                new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1")));
-
-        // Plan with tables that don't include a left node should not be 
invalidated.
+        // Plan with tables only must not be invalidated on topology change.
         assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
 
-        LogicalNode node1 = Mockito.mock(LogicalNode.class);
-        Mockito.when(node1.name()).thenReturn("NODE1");
-
-        mappingService.onNodeLeft(node1,
-                new LogicalTopologySnapshot(3, logicalNodes("NODE")));
-
-        // Plan with tables that include left node must be invalidated.
-        assertNotSame(tableOnlyMapping, await(mappingService.map(PLAN, 
PARAMS)));
-
-        verify(targetProvider, times(4)).forTable(any(), any(), any(), 
anyBoolean());
-        verify(targetProvider, times(2)).forSystemView(any(), any());
+        verify(execProvider, times(3)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
+        verify(execProvider, times(2)).forSystemView(any());
     }
 
     @Test
@@ -279,66 +220,101 @@ public class MappingServiceImplTest extends 
BaseIgniteAbstractTest {
         };
 
         // Initialize mapping service.
-        ExecutionTargetProvider targetProvider = 
Mockito.spy(createTargetProvider(nodeNames));
-        MappingServiceImpl mappingService = new MappingServiceImpl(
-                localNodeName, CLOCK_SERVICE, targetProvider, 
CaffeineCacheFactory.INSTANCE, 100, PARTITION_PRUNER, Runnable::run
-        );
+        Supplier<Long> logicalTopologyVerSupplier = 
createStableTopologySupplier();
+        ExecutionDistributionProvider execProvider = Mockito.spy(new 
TestExecutionDistributionProvider(nodeNames));
 
-        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
-                new LogicalTopologySnapshot(1, 
logicalNodes(nodeNames.toArray(new String[0]))));
+        MappingServiceImpl mappingService = Mockito.spy(new MappingServiceImpl(
+                localNodeName,
+                CLOCK_SERVICE,
+                CaffeineCacheFactory.INSTANCE,
+                100,
+                PARTITION_PRUNER,
+                logicalTopologyVerSupplier,
+                execProvider
+        ));
 
         List<MappedFragment> mappedFragments = 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS));
-        verify(targetProvider, times(1)).forTable(any(), any(), any(), 
anyBoolean());
-        verify(targetProvider, times(1)).forSystemView(any(), any());
+        verify(execProvider, times(1)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
+        verify(execProvider, times(1)).forSystemView(any());
 
         // Simulate expiration of the primary replica for non-mapped table - 
the cache entry should not be invalidated.
         
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T2")));
         assertSame(mappedFragments, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
-        verifyNoMoreInteractions(targetProvider);
+
+        verify(mappingService, times(1)).composeDistributions(anySet(), 
anySet(), anyBoolean());
 
         // Simulate expiration of the primary replica for mapped table - the 
cache entry should be invalidated.
         
await(mappingService.onPrimaryReplicaExpired(prepareEvtParams.apply("T1")));
         assertNotSame(mappedFragments, 
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
-        verify(targetProvider, times(2)).forTable(any(), any(), any(), 
anyBoolean());
-        verify(targetProvider, times(2)).forSystemView(any(), any());
+        verify(execProvider, times(2)).forTable(any(HybridTimestamp.class), 
any(IgniteTable.class), anyBoolean());
+        verify(execProvider, times(2)).forSystemView(any());
     }
 
-    private static List<LogicalNode> logicalNodes(String... nodeNames) {
-        return Arrays.stream(nodeNames)
-                .map(name -> new LogicalNode(randomUUID(), name, 
NetworkAddress.from("127.0.0.1:10000")))
-                .collect(Collectors.toList());
+    private MappingServiceImpl createMappingServiceNoCache(String 
localNodeName, List<String> nodeNames) {
+        return createMappingService(localNodeName, nodeNames, 0);
     }
 
-    private static MappingServiceImpl createMappingServiceNoCache(String 
localNodeName, List<String> nodeNames) {
+    private MappingServiceImpl createMappingService(String localNodeName, 
List<String> nodeNames, int cacheSize) {
+        Supplier<Long> logicalTopologyVerSupplier = 
createChangingTopologySupplier();
+        ExecutionDistributionProvider execProvider = new 
TestExecutionDistributionProvider(nodeNames);
+
         return new MappingServiceImpl(
                 localNodeName,
                 CLOCK_SERVICE,
-                createTargetProvider(nodeNames),
-                EmptyCacheFactory.INSTANCE,
-                0,
+                CaffeineCacheFactory.INSTANCE,
+                cacheSize,
                 PARTITION_PRUNER,
-                Runnable::run
+                logicalTopologyVerSupplier,
+                execProvider
         );
     }
 
-    private static ExecutionTargetProvider createTargetProvider(List<String> 
nodeNames) {
-        return new ExecutionTargetProvider() {
-            @Override
-            public CompletableFuture<ExecutionTarget> forTable(
-                    HybridTimestamp operationTime,
-                    ExecutionTargetFactory factory,
-                    IgniteTable table,
-                    boolean includeBackups
-            ) {
-                return 
CompletableFuture.completedFuture(factory.allOf(nodeNames));
-            }
+    /** Test distribution provider. */
+    public static class TestExecutionDistributionProvider implements 
ExecutionDistributionProvider {
+        private final List<String> nodeNames;
+        private Supplier<RuntimeException> exceptionSupplier = () -> null;
+
+        TestExecutionDistributionProvider(List<String> nodeNames) {
+            this.nodeNames = nodeNames;
+        }
+
+        /** Constructor. */
+        public TestExecutionDistributionProvider(List<String> nodeNames, 
Supplier<RuntimeException> exceptionSupplier) {
+            this.nodeNames = nodeNames;
+            this.exceptionSupplier = exceptionSupplier;
+        }
+
+        private static TokenizedAssignments mapAssignment(String peer) {
+            Set<Assignment> peers = Set.of(Assignment.forPeer(peer));
+            return new TokenizedAssignmentsImpl(peers, 1L);
+        }
 
-            @Override
-            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
-                return CompletableFuture.completedFuture(view.distribution() 
== IgniteDistributions.single()
-                        ? factory.oneOf(nodeNames)
-                        : factory.allOf(nodeNames));
+        @Override
+        public CompletableFuture<List<TokenizedAssignments>> 
forTable(HybridTimestamp operationTime, IgniteTable table,
+                boolean includeBackups) {
+            if (exceptionSupplier.get() != null) {
+                return CompletableFuture.failedFuture(exceptionSupplier.get());
             }
-        };
+
+            return CompletableFuture.completedFuture(nodeNames.stream()
+                    
.map(TestExecutionDistributionProvider::mapAssignment).collect(Collectors.toList()));
+        }
+
+        @Override
+        public List<String> forSystemView(IgniteSystemView view) {
+            return nodeNames;
+        }
+    }
+
+    private static Supplier<Long> createStableTopologySupplier() {
+        return () -> 1L;
+    }
+
+    private Supplier<Long> createTriggeredTopologySupplier() {
+        return () -> topologyChange ? ++topologyVer : topologyVer;
+    }
+
+    private Supplier<Long> createChangingTopologySupplier() {
+        return () -> topologyVer++;
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
index 93bb11bfeb..47a09336f1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -89,14 +89,17 @@ final class MappingTestRunner {
     /** Test setup. */
     static final class TestSetup {
 
-        private final ExecutionTargetProvider executionTargetProvider;
+        private final ExecutionDistributionProvider 
executionDistributionProvider;
 
         private final IgniteSchema schema;
 
         private final LogicalTopologySnapshot topologySnapshot;
 
-        TestSetup(ExecutionTargetProvider executionTargetProvider, 
IgniteSchema schema, LogicalTopologySnapshot topologySnapshot) {
-            this.executionTargetProvider = executionTargetProvider;
+        TestSetup(
+                ExecutionDistributionProvider executionDistributionProvider,
+                IgniteSchema schema, LogicalTopologySnapshot topologySnapshot
+        ) {
+            this.executionDistributionProvider = executionDistributionProvider;
             this.schema = schema;
             this.topologySnapshot = topologySnapshot;
         }
@@ -138,7 +141,7 @@ final class MappingTestRunner {
         Path testFile = location.resolve(fileName);
         List<TestCaseDef> testCases = loadTestCases(testFile);
 
-        runTestCases(testFile, setup.schema, setup.executionTargetProvider, 
setup.topologySnapshot, parseValidate, testCases);
+        runTestCases(testFile, setup.schema, 
setup.executionDistributionProvider, setup.topologySnapshot, parseValidate, 
testCases);
     }
 
     @TestOnly
@@ -153,7 +156,7 @@ final class MappingTestRunner {
 
     private void runTestCases(Path testFile,
             IgniteSchema schema,
-            ExecutionTargetProvider targetProvider,
+            ExecutionDistributionProvider executionDistributionProvider,
             LogicalTopologySnapshot snapshot,
             BiFunction<IgniteSchema, String, IgniteRel> parse,
             List<TestCaseDef> testCases) {
@@ -173,7 +176,7 @@ final class MappingTestRunner {
             MultiStepPlan multiStepPlan = new MultiStepPlan(new 
PlanId(UUID.randomUUID(), 1), sqlQueryType, rel,
                     resultSetMetadata, parameterMetadata, 
schema.catalogVersion(), null);
 
-            String actualText = produceMapping(testDef.nodeName, 
targetProvider, snapshot, multiStepPlan);
+            String actualText = produceMapping(testDef.nodeName, 
executionDistributionProvider, snapshot, multiStepPlan);
 
             actualResults.add(actualText);
         }
@@ -192,7 +195,7 @@ final class MappingTestRunner {
 
     private String produceMapping(
             String nodeName,
-            ExecutionTargetProvider targetProvider,
+            ExecutionDistributionProvider executionDistributionProvider,
             LogicalTopologySnapshot snapshot,
             MultiStepPlan plan
     ) {
@@ -201,13 +204,12 @@ final class MappingTestRunner {
         MappingServiceImpl mappingService = new MappingServiceImpl(
                 nodeName,
                 new TestClockService(new 
TestHybridClock(System::currentTimeMillis)),
-                targetProvider,
                 EmptyCacheFactory.INSTANCE,
                 0,
                 partitionPruner,
-                Runnable::run
+                snapshot::version,
+                executionDistributionProvider
         );
-        mappingService.onTopologyLeap(snapshot);
 
         List<MappedFragment> mappedFragments;
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
index 025bb3993d..fa9b65fd23 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
@@ -138,7 +138,7 @@ public class MappingTestRunnerSelfTest extends 
BaseIgniteAbstractTest {
 
         IllegalStateException err = assertThrows(IllegalStateException.class,
                 () -> runner.runTest(() -> {
-                    ExecutionTargetProvider targetProvider = 
Mockito.mock(ExecutionTargetProvider.class);
+                    ExecutionDistributionProvider targetProvider = 
Mockito.mock(ExecutionDistributionProvider.class);
                     IgniteSchema schema = new IgniteSchema("T", 1, List.of());
                     LogicalNode node = new LogicalNode(randomUUID(), "N1", new 
NetworkAddress("addr", 1000));
                     LogicalTopologySnapshot topologySnapshot = new 
LogicalTopologySnapshot(1, List.of(node));
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index b4663de522..0eb50b9a33 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -100,9 +100,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.TxAttributes;
 import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
-import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
-import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionDistributionProvider;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
 import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
@@ -709,7 +707,7 @@ public class TestBuilders {
             Map<String, TestNode> nodes = nodeNames.stream()
                     .map(name -> {
                         var systemViewManager = new 
SystemViewManagerImpl(name, catalogManager);
-                        var targetProvider = new 
TestNodeExecutionTargetProvider(
+                        var executionProvider = new 
TestExecutionDistributionProvider(
                                 systemViewManager::owningNodes,
                                 owningNodesByTableName,
                                 useTablePartitions
@@ -718,17 +716,16 @@ public class TestBuilders {
                         var mappingService = new MappingServiceImpl(
                                 name,
                                 new TestClockService(clock, clockWaiter),
-                                targetProvider,
                                 EmptyCacheFactory.INSTANCE,
                                 0,
                                 partitionPruner,
-                                Runnable::run
+                                () -> 1L,
+                                executionProvider
                         );
 
                         systemViewManager.register(() -> systemViews);
 
                         LogicalTopologySnapshot newTopology = new 
LogicalTopologySnapshot(1L, logicalNodes);
-                        mappingService.onTopologyLeap(newTopology);
                         systemViewManager.onTopologyLeap(newTopology);
 
                         return new TestNode(
@@ -1535,13 +1532,14 @@ public class TestBuilders {
         return newRow;
     }
 
-    /** Returns a builder for {@link ExecutionTargetProvider}. */
-    public static ExecutionTargetProviderBuilder 
executionTargetProviderBuilder() {
-        return new ExecutionTargetProviderBuilder();
+
+    /** Returns a builder for {@link ExecutionDistributionProvider}. */
+    public static ExecutionDistributionProviderBuilder 
executionDistributionProviderBuilder() {
+        return new ExecutionDistributionProviderBuilder();
     }
 
-    /** A builder to create instances of {@link ExecutionTargetProvider}. */
-    public static final class ExecutionTargetProviderBuilder {
+    /** A builder to create instances of {@link 
ExecutionDistributionProvider}. */
+    public static final class ExecutionDistributionProviderBuilder {
 
         private final Map<String, List<List<String>>> owningNodesByTableName = 
new HashMap<>();
 
@@ -1549,12 +1547,12 @@ public class TestBuilders {
 
         private boolean useTablePartitions;
 
-        private ExecutionTargetProviderBuilder() {
+        private ExecutionDistributionProviderBuilder() {
 
         }
 
         /** Adds tables to list of nodes mapping. */
-        public ExecutionTargetProviderBuilder addTables(Map<String, 
List<List<String>>> tables) {
+        public ExecutionDistributionProviderBuilder addTables(Map<String, 
List<List<String>>> tables) {
             this.owningNodesByTableName.putAll(tables);
             return this;
         }
@@ -1563,20 +1561,20 @@ public class TestBuilders {
          * Sets a function that returns system views. Function accepts a view 
name and returns a list of nodes
          * a system view is available at.
          */
-        public ExecutionTargetProviderBuilder setSystemViews(Function<String, 
List<String>> systemViews) {
+        public ExecutionDistributionProviderBuilder 
setSystemViews(Function<String, List<String>> systemViews) {
             this.owningNodesBySystemViewName = systemViews;
             return this;
         }
 
         /** Use table partitions to build mapping targets. Default is {@code 
false}. */
-        public ExecutionTargetProviderBuilder useTablePartitions(boolean 
value) {
+        public ExecutionDistributionProviderBuilder useTablePartitions(boolean 
value) {
             useTablePartitions = value;
             return this;
         }
 
-        /** Creates an instance of {@link ExecutionTargetProvider}. */
-        public ExecutionTargetProvider build() {
-            return new TestNodeExecutionTargetProvider(
+        /** Creates an instance of {@link ExecutionDistributionProvider}. */
+        public ExecutionDistributionProvider build() {
+            return new TestExecutionDistributionProvider(
                     owningNodesBySystemViewName,
                     Map.copyOf(owningNodesByTableName),
                     useTablePartitions
@@ -1584,15 +1582,14 @@ public class TestBuilders {
         }
     }
 
-    private static class TestNodeExecutionTargetProvider implements 
ExecutionTargetProvider {
-
+    private static class TestExecutionDistributionProvider implements 
ExecutionDistributionProvider {
         final Function<String, List<String>> owningNodesBySystemViewName;
 
         final Map<String, List<List<String>>> owningNodesByTableName;
 
         final boolean useTablePartitions;
 
-        private TestNodeExecutionTargetProvider(
+        private TestExecutionDistributionProvider(
                 Function<String, List<String>> owningNodesBySystemViewName,
                 Map<String, List<List<String>>> owningNodesByTableName,
                 boolean useTablePartitions
@@ -1602,19 +1599,18 @@ public class TestBuilders {
             this.useTablePartitions = useTablePartitions;
         }
 
+        private static TokenizedAssignments 
partitionNodesToAssignment(List<String> nodes, long token) {
+            return new TokenizedAssignmentsImpl(
+                    
nodes.stream().map(Assignment::forPeer).collect(Collectors.toSet()),
+                    token
+            );
+        }
+
         @Override
-        public CompletableFuture<ExecutionTarget> forTable(
-                HybridTimestamp operationTime,
-                ExecutionTargetFactory factory,
-                IgniteTable table,
-                boolean includeBackups
-        ) {
+        public CompletableFuture<List<TokenizedAssignments>> 
forTable(HybridTimestamp operationTime, IgniteTable table,
+                boolean includeBackups) {
             List<List<String>> owningNodes = 
owningNodesByTableName.get(table.name());
 
-            if (nullOrEmpty(owningNodes)) {
-                throw new AssertionError("DataProvider is not configured for 
table " + table.name());
-            }
-
             List<TokenizedAssignments> assignments;
 
             if (useTablePartitions) {
@@ -1630,34 +1626,19 @@ public class TestBuilders {
                         .collect(Collectors.toList());
             }
 
-            ExecutionTarget target = factory.partitioned(assignments);
-
-            return CompletableFuture.completedFuture(target);
-        }
-
-        private static TokenizedAssignments 
partitionNodesToAssignment(List<String> nodes, long token) {
-            return new TokenizedAssignmentsImpl(
-                    
nodes.stream().map(Assignment::forPeer).collect(Collectors.toSet()),
-                    token
-            );
+            return CompletableFuture.completedFuture(assignments);
         }
 
         @Override
-        public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+        public List<String> forSystemView(IgniteSystemView view) {
             List<String> nodes = 
owningNodesBySystemViewName.apply(view.name());
 
             if (nullOrEmpty(nodes)) {
-                return CompletableFuture.failedFuture(
-                        new SqlException(Sql.MAPPING_ERR, format("The view 
with name '{}' could not be found on"
-                                + " any active nodes in the cluster", 
view.name()))
-                );
+                throw new SqlException(Sql.MAPPING_ERR, format("The view with 
name '{}' could not be found on"
+                                + " any active nodes in the cluster", view));
             }
 
-            return CompletableFuture.completedFuture(
-                    view.distribution() == IgniteDistributions.single()
-                            ? factory.oneOf(nodes)
-                            : factory.allOf(nodes)
-            );
+            return view.distribution() == IgniteDistributions.single() ? 
List.of(nodes.get(0)) : nodes;
         }
     }
 }


Reply via email to