This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d757958ba1 IGNITE-19619 Sql. Term-bases to lease-based switch. (#2848)
d757958ba1 is described below

commit d757958ba19e3d4bac1632b1f34988d62e9c3473
Author: Evgeniy Stanilovskiy <stanilov...@gmail.com>
AuthorDate: Tue Nov 28 18:03:58 2023 +0300

    IGNITE-19619 Sql. Term-bases to lease-based switch. (#2848)
---
 .../ignite/client/fakes/FakeInternalTable.java     |  11 ---
 .../internal/replicator/TablePartitionId.java      |   1 +
 .../distributionzones/rebalance/RebalanceUtil.java |   1 +
 .../rebalance/TablePartitionId.java                | 100 ---------------------
 .../DistributionZoneRebalanceEngineTest.java       |   1 +
 .../RebalanceUtilUpdateAssignmentsTest.java        |   1 +
 .../runner/app/ItIgniteNodeRestartTest.java        |   3 +-
 .../ignite/internal/table/ItTableScanTest.java     |  27 +++---
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 modules/sql-engine/build.gradle                    |   1 +
 .../internal/sql/engine/SqlQueryProcessor.java     |  88 +++++++++++++++---
 .../sql/engine/exec/ExecutionServiceImpl.java      |   8 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   2 +-
 .../ignite/internal/table/InternalTable.java       |  10 ---
 .../distributed/storage/InternalTableImpl.java     |  45 ++--------
 15 files changed, 115 insertions(+), 187 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 12bb932fd0..6bab51f48c 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -29,7 +29,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.BiConsumer;
-import java.util.function.Function;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -424,11 +423,6 @@ public class FakeInternalTable implements InternalTable {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
-    @Override
-    public CompletableFuture<List<PrimaryReplica>> primaryReplicas() {
-        return CompletableFuture.failedFuture(new IgniteInternalException(new 
OperationNotSupportedException()));
-    }
-
     @Override
     public ClusterNode leaderAssignment(int partition) {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
@@ -477,9 +471,4 @@ public class FakeInternalTable implements InternalTable {
     public @Nullable PendingComparableValuesTracker<Long, Void> 
getPartitionStorageIndexTracker(int partitionId) {
         return null;
     }
-
-    @Override
-    public Function<String, ClusterNode> getClusterNodeResolver() {
-        return null;
-    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
index f752d238c2..fba5ff7768 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.replicator;
 
+// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be 
refactored to ZonePartitionId.
 /**
  * The class is used to identify a table replication group.
  */
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 2ac97db451..9c3e178236 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/TablePartitionId.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/TablePartitionId.java
deleted file mode 100644
index f0f7098117..0000000000
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/TablePartitionId.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.distributionzones.rebalance;
-
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-
-// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be 
refactored to ZonePartitionId.
-/**
- * The class is used to identify a table replication group.
- */
-public class TablePartitionId implements ReplicationGroupId {
-    private static final long serialVersionUID = -2428659904367844831L;
-
-    /** Table id. */
-    private final int tableId;
-
-    /** Partition id. */
-    private final int partId;
-
-    /**
-     * The constructor.
-     *
-     * @param tableId Table id.
-     * @param partId Partition id.
-     */
-    public TablePartitionId(int tableId, int partId) {
-        this.tableId = tableId;
-        this.partId = partId;
-    }
-
-    /**
-     * Converts a string representation of table partition id to the object.
-     *
-     * @param str String representation.
-     * @return An table partition id.
-     */
-    public static TablePartitionId fromString(String str) {
-        String[] parts = str.split("_part_");
-
-        return new TablePartitionId(Integer.parseInt(parts[0]), 
Integer.parseInt(parts[1]));
-    }
-
-    /**
-     * Get the partition id.
-     *
-     * @return Partition id.
-     */
-    public int partitionId() {
-        return partId;
-    }
-
-    /**
-     * Get the table id.
-     *
-     * @return Table id.
-     */
-    public int tableId() {
-        return tableId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        TablePartitionId that = (TablePartitionId) o;
-
-        return partId == that.partId && tableId == that.tableId;
-    }
-
-    @Override
-    public int hashCode() {
-        return tableId ^ partId;
-    }
-
-    @Override
-    public String toString() {
-        return tableId + "_part_" + partId;
-    }
-}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 74a0037207..cfbed58369 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index daf0d4d36b..11c5d6c864 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.network.ClusterService;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 7c81bf04e2..f4e5110f22 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -427,7 +427,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 schemaSyncService,
                 catalogManager,
                 metricManager,
-                new SystemViewManagerImpl(name, catalogManager)
+                new SystemViewManagerImpl(name, catalogManager),
+                placementDriverManager.placementDriver()
         );
 
         // Preparing the result map.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 1b846c0084..9ed1010956 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table;
 
+import static 
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
@@ -34,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
@@ -44,7 +46,7 @@ import java.util.function.Function;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.catalog.CatalogManager;
@@ -645,12 +647,16 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
             Publisher<BinaryRow> publisher;
 
             if (readOnly) {
-                List<String> assignments = internalTable.assignments();
-
                 // Any node from assignments will do it.
-                ClusterNode node0 = 
CLUSTER.aliveNode().clusterNodes().stream().filter(clusterNode -> {
-                    return assignments.contains(clusterNode.name());
-                }).findFirst().orElseThrow();
+                Set<Assignment> assignments = 
calculateAssignmentForPartition(CLUSTER.aliveNode().clusterNodes().stream().map(
+                        ClusterNode::name).collect(Collectors.toList()), 0, 1);
+
+                assertFalse(assignments.isEmpty());
+
+                String consId = assignments.iterator().next().consistentId();
+
+                ClusterNode node0 = 
CLUSTER.aliveNode().clusterNodes().stream().filter(n -> 
n.name().equals(consId)).findAny()
+                        .orElseThrow();
 
                 //noinspection DataFlowIssue
                 publisher = internalTable.scan(PART_ID, tx.readTimestamp(), 
node0, sortedIndexId, null, null, 0, null);
@@ -874,21 +880,22 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
      * @return Transaction.
      */
     private InternalTransaction startTxWithEnlistedPartition(int partId, 
boolean readOnly) {
-        Ignite ignite = CLUSTER.aliveNode();
+        IgniteImpl ignite = CLUSTER.aliveNode();
 
         InternalTransaction tx = (InternalTransaction) 
ignite.transactions().begin(new TransactionOptions().readOnly(readOnly));
 
         InternalTable table = ((TableViewInternal) 
ignite.tables().table(TABLE_NAME)).internalTable();
         TablePartitionId tblPartId = new TablePartitionId(table.tableId(), 
partId);
 
-        PlacementDriver placementDriver = ((IgniteImpl) 
ignite).placementDriver();
+        PlacementDriver placementDriver = ignite.placementDriver();
         ReplicaMeta primaryReplica = IgniteTestUtils.await(
-                placementDriver.awaitPrimaryReplica(tblPartId, ((IgniteImpl) 
ignite).clock().now(), 30, TimeUnit.SECONDS));
+                placementDriver.awaitPrimaryReplica(tblPartId, 
ignite.clock().now(), 30, TimeUnit.SECONDS));
 
         tx.enlist(
                 tblPartId,
                 new IgniteBiTuple<>(
-                        
table.getClusterNodeResolver().apply(primaryReplica.getLeaseholder()),
+                        ignite.clusterNodes().stream().filter(n -> 
n.name().equals(primaryReplica.getLeaseholder()))
+                                .findFirst().orElseThrow(),
                         primaryReplica.getStartTime().longValue()
                 )
         );
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 57046da721..ca1f634f42 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -625,7 +625,8 @@ public class IgniteImpl implements Ignite {
                 schemaSyncService,
                 catalogManager,
                 metricManager,
-                systemViewManager
+                systemViewManager,
+                placementDriverMgr.placementDriver()
         );
 
         sql = new IgniteSqlImpl(name, qryEngine, new 
IgniteTransactionsImpl(txManager, observableTimestampTracker));
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index af34a45ce7..6e75c017dc 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -40,6 +40,7 @@ dependencies {
     implementation project(':ignite-cluster-management')
     implementation project(':ignite-system-view-api')
     implementation project(':ignite-system-view')
+    implementation project(':ignite-placement-driver-api')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.caffeine
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 1f5cd31304..46c3a5b084 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
+import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
 
@@ -30,6 +34,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
@@ -44,14 +49,23 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistryImpl;
@@ -109,6 +123,9 @@ import org.jetbrains.annotations.TestOnly;
  *  TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public class SqlQueryProcessor implements QueryProcessor {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(SqlQueryProcessor.class);
+
     /** Size of the cache for query plans. */
     private static final int PLAN_CACHE_SIZE = 1024;
 
@@ -174,6 +191,9 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Metric manager. */
     private final MetricManager metricManager;
 
+    /** Placement driver. */
+    private final PlacementDriver placementDriver;
+
     private final ConcurrentMap<UUID, AsyncSqlCursor<?>> openedCursors = new 
ConcurrentHashMap<>();
 
     /** Constructor. */
@@ -190,7 +210,8 @@ public class SqlQueryProcessor implements QueryProcessor {
             SchemaSyncService schemaSyncService,
             CatalogManager catalogManager,
             MetricManager metricManager,
-            SystemViewManager systemViewManager
+            SystemViewManager systemViewManager,
+            PlacementDriver placementDriver
     ) {
         this.clusterSrvc = clusterSrvc;
         this.logicalTopologyService = logicalTopologyService;
@@ -204,6 +225,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         this.catalogManager = catalogManager;
         this.metricManager = metricManager;
         this.systemViewManager = systemViewManager;
+        this.placementDriver = placementDriver;
 
         sqlSchemaManager = new SqlSchemaManagerImpl(
                 catalogManager,
@@ -260,15 +282,8 @@ public class SqlQueryProcessor implements QueryProcessor {
         var executionTargetProvider = new ExecutionTargetProvider() {
             @Override
             public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
-                return tableManager.tableAsync(table.id())
-                        .thenCompose(tbl -> 
tbl.internalTable().primaryReplicas())
-                        .thenApply(replicas -> {
-                            List<NodeWithTerm> assignments = replicas.stream()
-                                    .map(primaryReplica -> new 
NodeWithTerm(primaryReplica.node().name(), primaryReplica.term()))
-                                    .collect(Collectors.toList());
-
-                            return factory.partitioned(assignments);
-                        });
+                return primaryReplicas(table.id())
+                        .thenApply(replicas -> factory.partitioned(replicas));
             }
 
             @Override
@@ -317,6 +332,59 @@ public class SqlQueryProcessor implements QueryProcessor {
         services.forEach(LifecycleAware::start);
     }
 
+    // need to be refactored after TODO: 
https://issues.apache.org/jira/browse/IGNITE-20925
+    /** Get primary replicas. */
+    private CompletableFuture<List<NodeWithTerm>> primaryReplicas(int tableId) 
{
+        int catalogVersion = catalogManager.latestCatalogVersion();
+
+        Catalog catalog = catalogManager.catalog(catalogVersion);
+
+        CatalogTableDescriptor tblDesc = 
Objects.requireNonNull(catalog.table(tableId), "table");
+
+        CatalogZoneDescriptor zoneDesc = 
Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone");
+
+        int partitions = zoneDesc.partitions();
+
+        List<CompletableFuture<NodeWithTerm>> result = new 
ArrayList<>(partitions);
+
+        HybridTimestamp clockNow = clock.now();
+
+        // no need to wait all partitions after pruning was implemented.
+        for (int partId = 0; partId < partitions; ++partId) {
+            int partitionId = partId;
+            ReplicationGroupId partGroupId = new TablePartitionId(tableId, 
partitionId);
+
+            CompletableFuture<ReplicaMeta> f = 
placementDriver.awaitPrimaryReplica(
+                    partGroupId,
+                    clockNow,
+                    AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                    SECONDS
+            );
+
+            result.add(f.handle((primaryReplica, e) -> {
+                if (e != null) {
+                    LOG.debug("Failed to retrieve primary replica for 
partition {}", e, partitionId);
+
+                    throw withCause(IgniteInternalException::new, 
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
+                            + " [tablePartitionId=" + partGroupId + ']', e);
+                } else {
+                    String holder = primaryReplica.getLeaseholder();
+
+                    assert holder != null : "Unable to map query, nothing 
holds the lease";
+
+                    return new NodeWithTerm(holder, 
primaryReplica.getStartTime().longValue());
+                }
+            }));
+        }
+
+        CompletableFuture<Void> all = 
CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
+
+        return all.thenApply(v -> result.stream()
+                .map(CompletableFuture::join)
+                .collect(Collectors.toList())
+        );
+    }
+
     /** {@inheritDoc} */
     @Override
     public synchronized void stop() throws Exception {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index a7ea216d5d..b9c952ad7d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -858,10 +858,12 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     tx.assignCommitPartition(new TablePartitionId(tableId, 
ThreadLocalRandom.current().nextInt(partsCnt)));
 
                     for (int p = 0; p < partsCnt; p++) {
-                        NodeWithTerm leaderWithTerm = assignments.get(p);
+                        TablePartitionId tablePartId = new 
TablePartitionId(tableId, p);
 
-                        tx.enlist(new TablePartitionId(tableId, p),
-                                new 
IgniteBiTuple<>(topSrvc.getByConsistentId(leaderWithTerm.name()), 
leaderWithTerm.term()));
+                        NodeWithTerm enlistmentToken = assignments.get(p);
+
+                        tx.enlist(tablePartId,
+                                new 
IgniteBiTuple<>(topSrvc.getByConsistentId(enlistmentToken.name()), 
enlistmentToken.term()));
                     }
                 }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 052cf89a9b..4b631aae3e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -110,7 +110,6 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
-import org.apache.ignite.internal.distributionzones.rebalance.TablePartitionId;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.index.IndexManager;
@@ -141,6 +140,7 @@ import 
org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.rest.configuration.RestConfiguration;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 72b0942492..c631642a69 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
-import java.util.function.Function;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -434,13 +433,6 @@ public interface InternalTable extends ManuallyCloseable {
      */
     List<String> assignments();
 
-    /**
-     * Gets a list of current primary replicas for each partition.
-     *
-     * @return List of current primary replicas for each partition.
-     */
-    CompletableFuture<List<PrimaryReplica>> primaryReplicas();
-
     /**
      * Returns cluster node that is the leader of the corresponding partition 
group or throws an exception if
      * it cannot be found.
@@ -487,6 +479,4 @@ public interface InternalTable extends ManuallyCloseable {
      * @param partitionId Partition ID.
      */
     @Nullable PendingComparableValuesTracker<Long, Void> 
getPartitionStorageIndexTracker(int partitionId);
-
-    Function<String, ClusterNode> getClusterNodeResolver();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 396604f00b..e19879c791 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -131,7 +131,8 @@ public class InternalTableImpl implements InternalTable {
     /** Number of attempts. */
     private static final int ATTEMPTS_TO_ENLIST_PARTITION = 5;
 
-    private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30;
+    /** Primary replica await timeout. */
+    public static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30;
 
     /** Map update guarded by {@link #updatePartitionMapsMux}. */
     protected volatile Int2ObjectMap<RaftGroupService> 
raftGroupServiceByPartitionId;
@@ -605,10 +606,10 @@ public class InternalTableImpl implements InternalTable {
                 tablePartitionId,
                 tx.startTimestamp(),
                 AWAIT_PRIMARY_REPLICA_TIMEOUT,
-                TimeUnit.SECONDS
+                SECONDS
         );
 
-        CompletableFuture<R>  fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
+        CompletableFuture<R> fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
             try {
                 ClusterNode node = 
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
 
@@ -1473,7 +1474,7 @@ public class InternalTableImpl implements InternalTable {
     }
 
     /** {@inheritDoc} */
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19619 The method 
should be removed, SQL engine should use placementDriver directly
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-20933 The method 
should be removed
     @Override
     public List<String> assignments() {
         awaitLeaderInitialization();
@@ -1485,37 +1486,6 @@ public class InternalTableImpl implements InternalTable {
                 .collect(Collectors.toList());
     }
 
-    /** {@inheritDoc} */
-    @Override
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19619 The method 
should be removed, SQL engine should use placementDriver directly
-    public CompletableFuture<List<PrimaryReplica>> primaryReplicas() {
-        List<Entry<RaftGroupService>> entries = new 
ArrayList<>(raftGroupServiceByPartitionId.int2ObjectEntrySet());
-        List<CompletableFuture<PrimaryReplica>> result = new ArrayList<>();
-
-        entries.sort(Comparator.comparingInt(Entry::getIntKey));
-
-        for (Entry<RaftGroupService> e : entries) {
-            CompletableFuture<ReplicaMeta> f = 
placementDriver.awaitPrimaryReplica(
-                    e.getValue().groupId(),
-                    clock.now(),
-                    AWAIT_PRIMARY_REPLICA_TIMEOUT,
-                    SECONDS
-            );
-
-            result.add(f.thenApply(primaryReplica -> {
-                ClusterNode node = 
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
-                return new PrimaryReplica(node, 
primaryReplica.getStartTime().longValue());
-            }));
-        }
-
-        CompletableFuture<Void> all = 
CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
-
-        return all.thenApply(v -> result.stream()
-                .map(CompletableFuture::join)
-                .collect(Collectors.toList())
-        );
-    }
-
     @Override
     public ClusterNode leaderAssignment(int partition) {
         awaitLeaderInitialization();
@@ -2027,9 +1997,4 @@ public class InternalTableImpl implements InternalTable {
 
         return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, 
keyRows0, txo, groupId, term, full);
     }
-
-    @Override
-    public Function<String, ClusterNode> getClusterNodeResolver() {
-        return clusterNodeResolver;
-    }
 }

Reply via email to