tkalkirill commented on code in PR #5276:
URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1981252494


##########
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java:
##########
@@ -1126,6 +1135,40 @@ public void testFunctionalDefaultTypeMismatch() {
         assertThrows(CatalogValidationException.class, commandBuilder::build, 
error);
     }
 
+    @Test
+    public void testCreateTablesWithinDifferentZones() {

Review Comment:
   The test indicates plurality in the name, let's make 2 zones and 2 tables in 
each and compare that the correct tables are returned for each zone.



##########
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java:
##########
@@ -1190,4 +1233,18 @@ private TestColumnTypeParams(ColumnType type, @Nullable 
Integer precision, @Null
     private @Nullable CatalogTableDescriptor table(int catalogVersion, String 
tableName) {
         return manager.catalog(catalogVersion).table(SCHEMA_NAME, tableName);
     }
+
+    private static Matcher<CatalogTableDescriptor> descriptorWithName(String 
name) {

Review Comment:
   I think it can be made simpler, for example, turning it into a collection of 
IDs and comparing them. At your discretion.



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -184,60 +232,104 @@ private CompletableFuture<?> 
onIndexRemoved(RemoveIndexEventParameters parameter
 
     private CompletableFuture<?> 
onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> {
-            TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
-
             if (isLocalNode(clusterService, parameters.leaseholderId())) {
-                primaryReplicaIds.add(primaryReplicaId);
+                // TODO https://issues.apache.org/jira/browse/IGNITE-22522
+                // Need to remove TablePartitionId check here and below.
+                assert parameters.groupId() instanceof ZonePartitionId || 
parameters.groupId() instanceof TablePartitionId :

Review Comment:
   Ok



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java:
##########
@@ -219,6 +223,14 @@ public Collection<CatalogTableDescriptor> tables() {
         return tablesById.values();
     }
 
+    /**
+     * Returns all tables that belong to the specified zone.
+     *
+     * @return A collection of table descriptors.
+     */

Review Comment:
   I would suggest removing it, it doesn't add any additional information. It's 
up to you.



##########
modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java:
##########
@@ -362,6 +362,10 @@ private int tableId(String tableName) {
         return TableTestUtils.getTableIdStrict(catalogManager, tableName, 
clock.nowLong());
     }
 
+    private int zoneId(String tableName) {

Review Comment:
   Lets rename to `zoneIdByTableName`



##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java:
##########
@@ -153,46 +160,84 @@ private void onIndexRemoved(RemoveIndexEventParameters 
parameters) {
 
     private void onPrimaryReplicaElected(PrimaryReplicaEventParameters 
parameters) {
         inBusyLock(busyLock, () -> {
-            TablePartitionId primaryReplicaId = (TablePartitionId) 
parameters.groupId();
+
+            PartitionGroupId primaryReplicaId = (PartitionGroupId) 
parameters.groupId();
 
             if (primaryReplicaId.partitionId() != 0) {
                 // We are only interested in the 0 partition.
                 return;
             }
 
-            int tableId = primaryReplicaId.tableId();
-
             if (isLocalNode(clusterService, parameters.leaseholderId())) {
-                if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) {
-                    scheduleTasksOnPrimaryReplicaElectedBusy(tableId);
-                }
+                scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId);
             } else {
-                if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) {
-                    changeIndexStatusTaskScheduler.stopTasksForTable(tableId);
-                }
+                scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId);
             }
         });
     }
 
-    private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) {
+    private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId 
partitionGroupId) {
+        // It is safe to get the latest version of the catalog because the 
PRIMARY_REPLICA_ELECTED event is handled on the metastore thread.
+        Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
+
+        IntListIterator tableIds =
+                getTableIdsForPrimaryReplicaElected(catalog, partitionGroupId, 
localNodeIsPrimaryReplicaForTableIds::add);
+
+        while (tableIds.hasNext()) {
+            for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(tableIds.nextInt())) {
+                switch (indexDescriptor.status()) {
+                    case REGISTERED:
+                        
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor);
+
+                        break;
+
+                    case STOPPING:
+                        
changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor);
+
+                        break;
+
+                    default:
+                        break;
+                }
+            }
+        }
+    }
+
+    private void scheduleStopTasksOnPrimaryReplicaElected(PartitionGroupId 
partitionGroupId) {
         // It is safe to get the latest version of the catalog because the 
PRIMARY_REPLICA_ELECTED event is handled on the metastore thread.
         Catalog catalog = 
catalogService.catalog(catalogService.latestCatalogVersion());
 
-        for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(tableId)) {
-            switch (indexDescriptor.status()) {
-                case REGISTERED:
-                    
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor);
+        IntListIterator tableIds =
+                getTableIdsForPrimaryReplicaElected(catalog, partitionGroupId, 
localNodeIsPrimaryReplicaForTableIds::remove);
+
+        while (tableIds.hasNext()) {
+            
changeIndexStatusTaskScheduler.stopTasksForTable(tableIds.nextInt());
+        }
+    }
 
-                    break;
+    private IntListIterator getTableIdsForPrimaryReplicaElected(
+            Catalog catalog,
+            PartitionGroupId partitionGroupId,
+            Int2BooleanFunction filter

Review Comment:
   I don't really like the approach with a filter that changes the internal 
state. Let's make the filter simple, but then immediately delete/add table IDs 
in batch, it looks clearer and more obvious.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to