This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new d757958ba1 IGNITE-19619 Sql. Term-bases to lease-based switch. (#2848) d757958ba1 is described below commit d757958ba19e3d4bac1632b1f34988d62e9c3473 Author: Evgeniy Stanilovskiy <stanilov...@gmail.com> AuthorDate: Tue Nov 28 18:03:58 2023 +0300 IGNITE-19619 Sql. Term-bases to lease-based switch. (#2848) --- .../ignite/client/fakes/FakeInternalTable.java | 11 --- .../internal/replicator/TablePartitionId.java | 1 + .../distributionzones/rebalance/RebalanceUtil.java | 1 + .../rebalance/TablePartitionId.java | 100 --------------------- .../DistributionZoneRebalanceEngineTest.java | 1 + .../RebalanceUtilUpdateAssignmentsTest.java | 1 + .../runner/app/ItIgniteNodeRestartTest.java | 3 +- .../ignite/internal/table/ItTableScanTest.java | 27 +++--- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- modules/sql-engine/build.gradle | 1 + .../internal/sql/engine/SqlQueryProcessor.java | 88 +++++++++++++++--- .../sql/engine/exec/ExecutionServiceImpl.java | 8 +- .../rebalance/ItRebalanceDistributedTest.java | 2 +- .../ignite/internal/table/InternalTable.java | 10 --- .../distributed/storage/InternalTableImpl.java | 45 ++-------- 15 files changed, 115 insertions(+), 187 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index 12bb932fd0..6bab51f48c 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -29,7 +29,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Flow.Publisher; import java.util.function.BiConsumer; -import java.util.function.Function; import javax.naming.OperationNotSupportedException; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -424,11 +423,6 @@ public class FakeInternalTable implements InternalTable { throw new IgniteInternalException(new OperationNotSupportedException()); } - @Override - public CompletableFuture<List<PrimaryReplica>> primaryReplicas() { - return CompletableFuture.failedFuture(new IgniteInternalException(new OperationNotSupportedException())); - } - @Override public ClusterNode leaderAssignment(int partition) { throw new IgniteInternalException(new OperationNotSupportedException()); @@ -477,9 +471,4 @@ public class FakeInternalTable implements InternalTable { public @Nullable PendingComparableValuesTracker<Long, Void> getPartitionStorageIndexTracker(int partitionId) { return null; } - - @Override - public Function<String, ClusterNode> getClusterNodeResolver() { - return null; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java index f752d238c2..fba5ff7768 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/replicator/TablePartitionId.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.replicator; +// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be refactored to ZonePartitionId. /** * The class is used to identify a table replication group. */ diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java index 2ac97db451..9c3e178236 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.dsl.Condition; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.vault.VaultEntry; import org.apache.ignite.internal.vault.VaultManager; diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/TablePartitionId.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/TablePartitionId.java deleted file mode 100644 index f0f7098117..0000000000 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/TablePartitionId.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.distributionzones.rebalance; - -import org.apache.ignite.internal.replicator.ReplicationGroupId; - -// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be refactored to ZonePartitionId. -/** - * The class is used to identify a table replication group. - */ -public class TablePartitionId implements ReplicationGroupId { - private static final long serialVersionUID = -2428659904367844831L; - - /** Table id. */ - private final int tableId; - - /** Partition id. */ - private final int partId; - - /** - * The constructor. - * - * @param tableId Table id. - * @param partId Partition id. - */ - public TablePartitionId(int tableId, int partId) { - this.tableId = tableId; - this.partId = partId; - } - - /** - * Converts a string representation of table partition id to the object. - * - * @param str String representation. - * @return An table partition id. - */ - public static TablePartitionId fromString(String str) { - String[] parts = str.split("_part_"); - - return new TablePartitionId(Integer.parseInt(parts[0]), Integer.parseInt(parts[1])); - } - - /** - * Get the partition id. - * - * @return Partition id. - */ - public int partitionId() { - return partId; - } - - /** - * Get the table id. - * - * @return Table id. - */ - public int tableId() { - return tableId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - TablePartitionId that = (TablePartitionId) o; - - return partId == that.partId && tableId == that.tableId; - } - - @Override - public int hashCode() { - return tableId ^ partId; - } - - @Override - public String toString() { - return tableId + "_part_" + partId; - } -} diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java index 74a0037207..cfbed58369 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java @@ -87,6 +87,7 @@ import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.IgniteSpinBusyLock; diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java index daf0d4d36b..11c5d6c864 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.WriteCommand; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.network.ClusterService; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 7c81bf04e2..f4e5110f22 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -427,7 +427,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { schemaSyncService, catalogManager, metricManager, - new SystemViewManagerImpl(name, catalogManager) + new SystemViewManagerImpl(name, catalogManager), + placementDriverManager.placementDriver() ); // Preparing the result map. diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java index 1b846c0084..9ed1010956 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table; +import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition; import static org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL; import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL; import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace; @@ -34,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; @@ -44,7 +46,7 @@ import java.util.function.Function; import java.util.function.IntFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.ignite.Ignite; +import org.apache.ignite.internal.affinity.Assignment; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.catalog.CatalogManager; @@ -645,12 +647,16 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { Publisher<BinaryRow> publisher; if (readOnly) { - List<String> assignments = internalTable.assignments(); - // Any node from assignments will do it. - ClusterNode node0 = CLUSTER.aliveNode().clusterNodes().stream().filter(clusterNode -> { - return assignments.contains(clusterNode.name()); - }).findFirst().orElseThrow(); + Set<Assignment> assignments = calculateAssignmentForPartition(CLUSTER.aliveNode().clusterNodes().stream().map( + ClusterNode::name).collect(Collectors.toList()), 0, 1); + + assertFalse(assignments.isEmpty()); + + String consId = assignments.iterator().next().consistentId(); + + ClusterNode node0 = CLUSTER.aliveNode().clusterNodes().stream().filter(n -> n.name().equals(consId)).findAny() + .orElseThrow(); //noinspection DataFlowIssue publisher = internalTable.scan(PART_ID, tx.readTimestamp(), node0, sortedIndexId, null, null, 0, null); @@ -874,21 +880,22 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { * @return Transaction. */ private InternalTransaction startTxWithEnlistedPartition(int partId, boolean readOnly) { - Ignite ignite = CLUSTER.aliveNode(); + IgniteImpl ignite = CLUSTER.aliveNode(); InternalTransaction tx = (InternalTransaction) ignite.transactions().begin(new TransactionOptions().readOnly(readOnly)); InternalTable table = ((TableViewInternal) ignite.tables().table(TABLE_NAME)).internalTable(); TablePartitionId tblPartId = new TablePartitionId(table.tableId(), partId); - PlacementDriver placementDriver = ((IgniteImpl) ignite).placementDriver(); + PlacementDriver placementDriver = ignite.placementDriver(); ReplicaMeta primaryReplica = IgniteTestUtils.await( - placementDriver.awaitPrimaryReplica(tblPartId, ((IgniteImpl) ignite).clock().now(), 30, TimeUnit.SECONDS)); + placementDriver.awaitPrimaryReplica(tblPartId, ignite.clock().now(), 30, TimeUnit.SECONDS)); tx.enlist( tblPartId, new IgniteBiTuple<>( - table.getClusterNodeResolver().apply(primaryReplica.getLeaseholder()), + ignite.clusterNodes().stream().filter(n -> n.name().equals(primaryReplica.getLeaseholder())) + .findFirst().orElseThrow(), primaryReplica.getStartTime().longValue() ) ); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 57046da721..ca1f634f42 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -625,7 +625,8 @@ public class IgniteImpl implements Ignite { schemaSyncService, catalogManager, metricManager, - systemViewManager + systemViewManager, + placementDriverMgr.placementDriver() ); sql = new IgniteSqlImpl(name, qryEngine, new IgniteTransactionsImpl(txManager, observableTimestampTracker)); diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle index af34a45ce7..6e75c017dc 100644 --- a/modules/sql-engine/build.gradle +++ b/modules/sql-engine/build.gradle @@ -40,6 +40,7 @@ dependencies { implementation project(':ignite-cluster-management') implementation project(':ignite-system-view-api') implementation project(':ignite-system-view') + implementation project(':ignite-placement-driver-api') implementation libs.jetbrains.annotations implementation libs.fastutil.core implementation libs.caffeine diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 1f5cd31304..46c3a5b084 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -17,11 +17,15 @@ package org.apache.ignite.internal.sql.engine; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; +import static org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; +import static org.apache.ignite.internal.util.ExceptionUtils.withCause; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR; @@ -30,6 +34,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.UUID; @@ -44,14 +49,23 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.Frameworks; +import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metrics.MetricManager; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl; import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistryImpl; @@ -109,6 +123,9 @@ import org.jetbrains.annotations.TestOnly; * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ public class SqlQueryProcessor implements QueryProcessor { + /** The logger. */ + private static final IgniteLogger LOG = Loggers.forClass(SqlQueryProcessor.class); + /** Size of the cache for query plans. */ private static final int PLAN_CACHE_SIZE = 1024; @@ -174,6 +191,9 @@ public class SqlQueryProcessor implements QueryProcessor { /** Metric manager. */ private final MetricManager metricManager; + /** Placement driver. */ + private final PlacementDriver placementDriver; + private final ConcurrentMap<UUID, AsyncSqlCursor<?>> openedCursors = new ConcurrentHashMap<>(); /** Constructor. */ @@ -190,7 +210,8 @@ public class SqlQueryProcessor implements QueryProcessor { SchemaSyncService schemaSyncService, CatalogManager catalogManager, MetricManager metricManager, - SystemViewManager systemViewManager + SystemViewManager systemViewManager, + PlacementDriver placementDriver ) { this.clusterSrvc = clusterSrvc; this.logicalTopologyService = logicalTopologyService; @@ -204,6 +225,7 @@ public class SqlQueryProcessor implements QueryProcessor { this.catalogManager = catalogManager; this.metricManager = metricManager; this.systemViewManager = systemViewManager; + this.placementDriver = placementDriver; sqlSchemaManager = new SqlSchemaManagerImpl( catalogManager, @@ -260,15 +282,8 @@ public class SqlQueryProcessor implements QueryProcessor { var executionTargetProvider = new ExecutionTargetProvider() { @Override public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, IgniteTable table) { - return tableManager.tableAsync(table.id()) - .thenCompose(tbl -> tbl.internalTable().primaryReplicas()) - .thenApply(replicas -> { - List<NodeWithTerm> assignments = replicas.stream() - .map(primaryReplica -> new NodeWithTerm(primaryReplica.node().name(), primaryReplica.term())) - .collect(Collectors.toList()); - - return factory.partitioned(assignments); - }); + return primaryReplicas(table.id()) + .thenApply(replicas -> factory.partitioned(replicas)); } @Override @@ -317,6 +332,59 @@ public class SqlQueryProcessor implements QueryProcessor { services.forEach(LifecycleAware::start); } + // need to be refactored after TODO: https://issues.apache.org/jira/browse/IGNITE-20925 + /** Get primary replicas. */ + private CompletableFuture<List<NodeWithTerm>> primaryReplicas(int tableId) { + int catalogVersion = catalogManager.latestCatalogVersion(); + + Catalog catalog = catalogManager.catalog(catalogVersion); + + CatalogTableDescriptor tblDesc = Objects.requireNonNull(catalog.table(tableId), "table"); + + CatalogZoneDescriptor zoneDesc = Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone"); + + int partitions = zoneDesc.partitions(); + + List<CompletableFuture<NodeWithTerm>> result = new ArrayList<>(partitions); + + HybridTimestamp clockNow = clock.now(); + + // no need to wait all partitions after pruning was implemented. + for (int partId = 0; partId < partitions; ++partId) { + int partitionId = partId; + ReplicationGroupId partGroupId = new TablePartitionId(tableId, partitionId); + + CompletableFuture<ReplicaMeta> f = placementDriver.awaitPrimaryReplica( + partGroupId, + clockNow, + AWAIT_PRIMARY_REPLICA_TIMEOUT, + SECONDS + ); + + result.add(f.handle((primaryReplica, e) -> { + if (e != null) { + LOG.debug("Failed to retrieve primary replica for partition {}", e, partitionId); + + throw withCause(IgniteInternalException::new, REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica" + + " [tablePartitionId=" + partGroupId + ']', e); + } else { + String holder = primaryReplica.getLeaseholder(); + + assert holder != null : "Unable to map query, nothing holds the lease"; + + return new NodeWithTerm(holder, primaryReplica.getStartTime().longValue()); + } + })); + } + + CompletableFuture<Void> all = CompletableFuture.allOf(result.toArray(new CompletableFuture[0])); + + return all.thenApply(v -> result.stream() + .map(CompletableFuture::join) + .collect(Collectors.toList()) + ); + } + /** {@inheritDoc} */ @Override public synchronized void stop() throws Exception { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index a7ea216d5d..b9c952ad7d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -858,10 +858,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve tx.assignCommitPartition(new TablePartitionId(tableId, ThreadLocalRandom.current().nextInt(partsCnt))); for (int p = 0; p < partsCnt; p++) { - NodeWithTerm leaderWithTerm = assignments.get(p); + TablePartitionId tablePartId = new TablePartitionId(tableId, p); - tx.enlist(new TablePartitionId(tableId, p), - new IgniteBiTuple<>(topSrvc.getByConsistentId(leaderWithTerm.name()), leaderWithTerm.term())); + NodeWithTerm enlistmentToken = assignments.get(p); + + tx.enlist(tablePartId, + new IgniteBiTuple<>(topSrvc.getByConsistentId(enlistmentToken.name()), enlistmentToken.term())); } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 052cf89a9b..4b631aae3e 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -110,7 +110,6 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil; -import org.apache.ignite.internal.distributionzones.rebalance.TablePartitionId; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.index.IndexManager; @@ -141,6 +140,7 @@ import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory; import org.apache.ignite.internal.replicator.Replica; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.rest.configuration.RestConfiguration; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index 72b0942492..c631642a69 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Publisher; -import java.util.function.Function; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; @@ -434,13 +433,6 @@ public interface InternalTable extends ManuallyCloseable { */ List<String> assignments(); - /** - * Gets a list of current primary replicas for each partition. - * - * @return List of current primary replicas for each partition. - */ - CompletableFuture<List<PrimaryReplica>> primaryReplicas(); - /** * Returns cluster node that is the leader of the corresponding partition group or throws an exception if * it cannot be found. @@ -487,6 +479,4 @@ public interface InternalTable extends ManuallyCloseable { * @param partitionId Partition ID. */ @Nullable PendingComparableValuesTracker<Long, Void> getPartitionStorageIndexTracker(int partitionId); - - Function<String, ClusterNode> getClusterNodeResolver(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 396604f00b..e19879c791 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -131,7 +131,8 @@ public class InternalTableImpl implements InternalTable { /** Number of attempts. */ private static final int ATTEMPTS_TO_ENLIST_PARTITION = 5; - private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30; + /** Primary replica await timeout. */ + public static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30; /** Map update guarded by {@link #updatePartitionMapsMux}. */ protected volatile Int2ObjectMap<RaftGroupService> raftGroupServiceByPartitionId; @@ -605,10 +606,10 @@ public class InternalTableImpl implements InternalTable { tablePartitionId, tx.startTimestamp(), AWAIT_PRIMARY_REPLICA_TIMEOUT, - TimeUnit.SECONDS + SECONDS ); - CompletableFuture<R> fut = primaryReplicaFuture.thenCompose(primaryReplica -> { + CompletableFuture<R> fut = primaryReplicaFuture.thenCompose(primaryReplica -> { try { ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder()); @@ -1473,7 +1474,7 @@ public class InternalTableImpl implements InternalTable { } /** {@inheritDoc} */ - // TODO: https://issues.apache.org/jira/browse/IGNITE-19619 The method should be removed, SQL engine should use placementDriver directly + // TODO: https://issues.apache.org/jira/browse/IGNITE-20933 The method should be removed @Override public List<String> assignments() { awaitLeaderInitialization(); @@ -1485,37 +1486,6 @@ public class InternalTableImpl implements InternalTable { .collect(Collectors.toList()); } - /** {@inheritDoc} */ - @Override - // TODO: https://issues.apache.org/jira/browse/IGNITE-19619 The method should be removed, SQL engine should use placementDriver directly - public CompletableFuture<List<PrimaryReplica>> primaryReplicas() { - List<Entry<RaftGroupService>> entries = new ArrayList<>(raftGroupServiceByPartitionId.int2ObjectEntrySet()); - List<CompletableFuture<PrimaryReplica>> result = new ArrayList<>(); - - entries.sort(Comparator.comparingInt(Entry::getIntKey)); - - for (Entry<RaftGroupService> e : entries) { - CompletableFuture<ReplicaMeta> f = placementDriver.awaitPrimaryReplica( - e.getValue().groupId(), - clock.now(), - AWAIT_PRIMARY_REPLICA_TIMEOUT, - SECONDS - ); - - result.add(f.thenApply(primaryReplica -> { - ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder()); - return new PrimaryReplica(node, primaryReplica.getStartTime().longValue()); - })); - } - - CompletableFuture<Void> all = CompletableFuture.allOf(result.toArray(new CompletableFuture[0])); - - return all.thenApply(v -> result.stream() - .map(CompletableFuture::join) - .collect(Collectors.toList()) - ); - } - @Override public ClusterNode leaderAssignment(int partition) { awaitLeaderInitialization(); @@ -2027,9 +1997,4 @@ public class InternalTableImpl implements InternalTable { return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL, keyRows0, txo, groupId, term, full); } - - @Override - public Function<String, ClusterNode> getClusterNodeResolver() { - return clusterNodeResolver; - } }