sanpwc commented on code in PR #2848:
URL: https://github.com/apache/ignite-3/pull/2848#discussion_r1398730855


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +334,60 @@ public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory f
         services.forEach(LifecycleAware::start);
     }
 
+    /** Get primary replicas. */
+    private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int 
tableId) {

Review Comment:
   Could you please clarify when given method is actually used?
   Let's say that we have following flow:
   ```
   tx1 = transactions.begin();
   insert into t1 values(k1,v1);
   insert into t1 values(k2, v2);
   tx1.commit();
   ```
   When will the primaryReplcias() method be called? How many times? How do you 
manage previously retrieved primary replicas? Especially when it was changes 
(hope that you won't silently overwrite it in the inner primary replicas 
cache)? 
   
   What I mean here, is that from transactions point of view it's only required 
to await primary replicas on **first** only.



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java:
##########
@@ -643,12 +644,10 @@ public void testMvScan(boolean readOnly) throws Exception 
{
             Publisher<BinaryRow> publisher;
 
             if (readOnly) {
-                List<String> assignments = internalTable.assignments();
-
                 // Any node from assignments will do it.
-                ClusterNode node0 = 
CLUSTER.aliveNode().clusterNodes().stream().filter(clusterNode -> {
-                    return assignments.contains(clusterNode.name());
-                }).findFirst().orElseThrow();
+                ClusterNode node0 = 
internalTable.leaderAssignment(ThreadLocalRandom.current().nextInt(internalTable.partitions()));

Review Comment:
   leaderAssignment breaks the encapsulation and should be dropped (not by you 
and not within given ticket of course), why do you need it?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +334,60 @@ public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory f
         services.forEach(LifecycleAware::start);
     }
 
+    /** Get primary replicas. */
+    private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int 
tableId) {
+        int catalogVersion = catalogManager.latestCatalogVersion();
+
+        Catalog catalog = catalogManager.catalog(catalogVersion);
+
+        CatalogTableDescriptor tblDesc = 
Objects.requireNonNull(catalog.table(tableId), "table");
+
+        CatalogZoneDescriptor zoneDesc = 
Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone");
+
+        int partitions = zoneDesc.partitions();
+
+        List<CompletableFuture<PrimaryReplica>> result = new 
ArrayList<>(partitions);
+
+        HybridTimestamp clockNow = clock.now();
+
+        // no need to wait all partitions after pruning was implemented.
+        for (int partitionId = 0; partitionId < partitions; ++partitionId) {
+            ReplicationGroupId partGroupId = new TablePartitionId(tableId, 
partitionId);
+
+            CompletableFuture<ReplicaMeta> f = 
placementDriver.awaitPrimaryReplica(
+                    partGroupId,
+                    clockNow,
+                    AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                    SECONDS
+            );
+
+            result.add(f.thenApply(primaryReplica -> {
+                String holder = primaryReplica.getLeaseholder();
+
+                if (holder == null) {

Review Comment:
   I don't think it's actually possible. In case of awaitPrimaryReplica you 
will either get valid !null one or PrimaryReplicaAwaitException will be thrown. 
So, I'd rather put an assert here.



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +334,60 @@ public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory f
         services.forEach(LifecycleAware::start);
     }
 
+    /** Get primary replicas. */
+    private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int 
tableId) {
+        int catalogVersion = catalogManager.latestCatalogVersion();
+
+        Catalog catalog = catalogManager.catalog(catalogVersion);
+
+        CatalogTableDescriptor tblDesc = 
Objects.requireNonNull(catalog.table(tableId), "table");
+
+        CatalogZoneDescriptor zoneDesc = 
Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone");
+
+        int partitions = zoneDesc.partitions();
+
+        List<CompletableFuture<PrimaryReplica>> result = new 
ArrayList<>(partitions);
+
+        HybridTimestamp clockNow = clock.now();
+
+        // no need to wait all partitions after pruning was implemented.
+        for (int partitionId = 0; partitionId < partitions; ++partitionId) {
+            ReplicationGroupId partGroupId = new TablePartitionId(tableId, 
partitionId);
+
+            CompletableFuture<ReplicaMeta> f = 
placementDriver.awaitPrimaryReplica(
+                    partGroupId,
+                    clockNow,
+                    AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                    SECONDS
+            );
+
+            result.add(f.thenApply(primaryReplica -> {
+                String holder = primaryReplica.getLeaseholder();
+
+                if (holder == null) {
+                    // additional recovery logic is need to be present around 
here.
+                    throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable 
to map query, nothing holds the lease");
+                }
+
+                ClusterNode node = 
clusterSrvc.topologyService().getByConsistentId(holder);
+
+                if (node == null) {
+                    // additional recovery logic is need to be present around 
here.
+                    throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable 
to map query, node is lost or offline");
+                }
+
+                return new PrimaryReplica(node, 
primaryReplica.getStartTime().longValue());
+            }));
+        }
+
+        CompletableFuture<Void> all = 
CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
+
+        return all.thenApply(v -> result.stream()

Review Comment:
   How do you handle PrimaryReplicaAwaitException or any other exceptions that 
might be thrown from awaitPrimaryReplica?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/TablePartitionId.java:
##########
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.distributionzones.rebalance;
-
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
-
-// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Should be 
refactored to ZonePartitionId.

Review Comment:
   Please add given todo to an existing TablePartitionId.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to