sanpwc commented on code in PR #2848:
URL: https://github.com/apache/ignite-3/pull/2848#discussion_r1398730855
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +334,60 @@ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory f
services.forEach(LifecycleAware::start);
}
+ /** Get primary replicas. */
+ private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int
tableId) {
Review Comment:
Could you please clarify when given method is actually used?
Let's say that we have following flow:
```
tx1 = transactions.begin();
insert into t1 values(k1,v1);
insert into t1 values(k2, v2);
tx1.commit();
```
When will the primaryReplcias() method be called? How many times? How do you
manage previously retrieved primary replicas? Especially when it was changes
(hope that you won't silently overwrite it in the inner primary replicas
cache)?
What I mean here, is that from transactions point of view it's only required
to await primary replicas on **first** only.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java:
##########
@@ -643,12 +644,10 @@ public void testMvScan(boolean readOnly) throws Exception
{
Publisher<BinaryRow> publisher;
if (readOnly) {
- List<String> assignments = internalTable.assignments();
-
// Any node from assignments will do it.
- ClusterNode node0 =
CLUSTER.aliveNode().clusterNodes().stream().filter(clusterNode -> {
- return assignments.contains(clusterNode.name());
- }).findFirst().orElseThrow();
+ ClusterNode node0 =
internalTable.leaderAssignment(ThreadLocalRandom.current().nextInt(internalTable.partitions()));
Review Comment:
leaderAssignment breaks the encapsulation and should be dropped (not by you
and not within given ticket of course), why do you need it?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +334,60 @@ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory f
services.forEach(LifecycleAware::start);
}
+ /** Get primary replicas. */
+ private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int
tableId) {
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ Catalog catalog = catalogManager.catalog(catalogVersion);
+
+ CatalogTableDescriptor tblDesc =
Objects.requireNonNull(catalog.table(tableId), "table");
+
+ CatalogZoneDescriptor zoneDesc =
Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone");
+
+ int partitions = zoneDesc.partitions();
+
+ List<CompletableFuture<PrimaryReplica>> result = new
ArrayList<>(partitions);
+
+ HybridTimestamp clockNow = clock.now();
+
+ // no need to wait all partitions after pruning was implemented.
+ for (int partitionId = 0; partitionId < partitions; ++partitionId) {
+ ReplicationGroupId partGroupId = new TablePartitionId(tableId,
partitionId);
+
+ CompletableFuture<ReplicaMeta> f =
placementDriver.awaitPrimaryReplica(
+ partGroupId,
+ clockNow,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
+
+ result.add(f.thenApply(primaryReplica -> {
+ String holder = primaryReplica.getLeaseholder();
+
+ if (holder == null) {
Review Comment:
I don't think it's actually possible. In case of awaitPrimaryReplica you
will either get valid !null one or PrimaryReplicaAwaitException will be thrown.
So, I'd rather put an assert here.
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +334,60 @@ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory f
services.forEach(LifecycleAware::start);
}
+ /** Get primary replicas. */
+ private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int
tableId) {
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ Catalog catalog = catalogManager.catalog(catalogVersion);
+
+ CatalogTableDescriptor tblDesc =
Objects.requireNonNull(catalog.table(tableId), "table");
+
+ CatalogZoneDescriptor zoneDesc =
Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone");
+
+ int partitions = zoneDesc.partitions();
+
+ List<CompletableFuture<PrimaryReplica>> result = new
ArrayList<>(partitions);
+
+ HybridTimestamp clockNow = clock.now();
+
+ // no need to wait all partitions after pruning was implemented.
+ for (int partitionId = 0; partitionId < partitions; ++partitionId) {
+ ReplicationGroupId partGroupId = new TablePartitionId(tableId,
partitionId);
+
+ CompletableFuture<ReplicaMeta> f =
placementDriver.awaitPrimaryReplica(
+ partGroupId,
+ clockNow,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
+
+ result.add(f.thenApply(primaryReplica -> {
+ String holder = primaryReplica.getLeaseholder();
+
+ if (holder == null) {
+ // additional recovery logic is need to be present around
here.
+ throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable
to map query, nothing holds the lease");
+ }
+
+ ClusterNode node =
clusterSrvc.topologyService().getByConsistentId(holder);
+
+ if (node == null) {
+ // additional recovery logic is need to be present around
here.
+ throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable
to map query, node is lost or offline");
+ }
+
+ return new PrimaryReplica(node,
primaryReplica.getStartTime().longValue());
+ }));
+ }
+
+ CompletableFuture<Void> all =
CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
+
+ return all.thenApply(v -> result.stream()
Review Comment:
How do you handle PrimaryReplicaAwaitException or any other exceptions that
might be thrown from awaitPrimaryReplica?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/TablePartitionId.java:
##########
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.distributionzones.rebalance;
-
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-
-// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be
refactored to ZonePartitionId.
Review Comment:
Please add given todo to an existing TablePartitionId.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]