tkalkirill commented on code in PR #5276:
URL: https://github.com/apache/ignite-3/pull/5276#discussion_r1981252494
##########
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java:
##########
@@ -1126,6 +1135,40 @@ public void testFunctionalDefaultTypeMismatch() {
assertThrows(CatalogValidationException.class, commandBuilder::build,
error);
}
+ @Test
+ public void testCreateTablesWithinDifferentZones() {
Review Comment:
The test indicates plurality in the name, let's make 2 zones and 2 tables in
each and compare that the correct tables are returned for each zone.
##########
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java:
##########
@@ -1190,4 +1233,18 @@ private TestColumnTypeParams(ColumnType type, @Nullable
Integer precision, @Null
private @Nullable CatalogTableDescriptor table(int catalogVersion, String
tableName) {
return manager.catalog(catalogVersion).table(SCHEMA_NAME, tableName);
}
+
+ private static Matcher<CatalogTableDescriptor> descriptorWithName(String
name) {
Review Comment:
I think it can be made simpler, for example, turning it into a collection of
IDs and comparing them. At your discretion.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -184,60 +232,104 @@ private CompletableFuture<?>
onIndexRemoved(RemoveIndexEventParameters parameter
private CompletableFuture<?>
onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
- TablePartitionId primaryReplicaId = (TablePartitionId)
parameters.groupId();
-
if (isLocalNode(clusterService, parameters.leaseholderId())) {
- primaryReplicaIds.add(primaryReplicaId);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-22522
+ // Need to remove TablePartitionId check here and below.
+ assert parameters.groupId() instanceof ZonePartitionId ||
parameters.groupId() instanceof TablePartitionId :
Review Comment:
Ok
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java:
##########
@@ -219,6 +223,14 @@ public Collection<CatalogTableDescriptor> tables() {
return tablesById.values();
}
+ /**
+ * Returns all tables that belong to the specified zone.
+ *
+ * @return A collection of table descriptors.
+ */
Review Comment:
I would suggest removing it, it doesn't add any additional information. It's
up to you.
##########
modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java:
##########
@@ -362,6 +362,10 @@ private int tableId(String tableName) {
return TableTestUtils.getTableIdStrict(catalogManager, tableName,
clock.nowLong());
}
+ private int zoneId(String tableName) {
Review Comment:
Lets rename to `zoneIdByTableName`
##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java:
##########
@@ -153,46 +160,84 @@ private void onIndexRemoved(RemoveIndexEventParameters
parameters) {
private void onPrimaryReplicaElected(PrimaryReplicaEventParameters
parameters) {
inBusyLock(busyLock, () -> {
- TablePartitionId primaryReplicaId = (TablePartitionId)
parameters.groupId();
+
+ PartitionGroupId primaryReplicaId = (PartitionGroupId)
parameters.groupId();
if (primaryReplicaId.partitionId() != 0) {
// We are only interested in the 0 partition.
return;
}
- int tableId = primaryReplicaId.tableId();
-
if (isLocalNode(clusterService, parameters.leaseholderId())) {
- if (localNodeIsPrimaryReplicaForTableIds.add(tableId)) {
- scheduleTasksOnPrimaryReplicaElectedBusy(tableId);
- }
+ scheduleTasksOnPrimaryReplicaElectedBusy(primaryReplicaId);
} else {
- if (localNodeIsPrimaryReplicaForTableIds.remove(tableId)) {
- changeIndexStatusTaskScheduler.stopTasksForTable(tableId);
- }
+ scheduleStopTasksOnPrimaryReplicaElected(primaryReplicaId);
}
});
}
- private void scheduleTasksOnPrimaryReplicaElectedBusy(int tableId) {
+ private void scheduleTasksOnPrimaryReplicaElectedBusy(PartitionGroupId
partitionGroupId) {
+ // It is safe to get the latest version of the catalog because the
PRIMARY_REPLICA_ELECTED event is handled on the metastore thread.
+ Catalog catalog =
catalogService.catalog(catalogService.latestCatalogVersion());
+
+ IntListIterator tableIds =
+ getTableIdsForPrimaryReplicaElected(catalog, partitionGroupId,
localNodeIsPrimaryReplicaForTableIds::add);
+
+ while (tableIds.hasNext()) {
+ for (CatalogIndexDescriptor indexDescriptor :
catalog.indexes(tableIds.nextInt())) {
+ switch (indexDescriptor.status()) {
+ case REGISTERED:
+
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor);
+
+ break;
+
+ case STOPPING:
+
changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor);
+
+ break;
+
+ default:
+ break;
+ }
+ }
+ }
+ }
+
+ private void scheduleStopTasksOnPrimaryReplicaElected(PartitionGroupId
partitionGroupId) {
// It is safe to get the latest version of the catalog because the
PRIMARY_REPLICA_ELECTED event is handled on the metastore thread.
Catalog catalog =
catalogService.catalog(catalogService.latestCatalogVersion());
- for (CatalogIndexDescriptor indexDescriptor :
catalog.indexes(tableId)) {
- switch (indexDescriptor.status()) {
- case REGISTERED:
-
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(indexDescriptor);
+ IntListIterator tableIds =
+ getTableIdsForPrimaryReplicaElected(catalog, partitionGroupId,
localNodeIsPrimaryReplicaForTableIds::remove);
+
+ while (tableIds.hasNext()) {
+
changeIndexStatusTaskScheduler.stopTasksForTable(tableIds.nextInt());
+ }
+ }
- break;
+ private IntListIterator getTableIdsForPrimaryReplicaElected(
+ Catalog catalog,
+ PartitionGroupId partitionGroupId,
+ Int2BooleanFunction filter
Review Comment:
I don't really like the approach with a filter that changes the internal
state. Let's make the filter simple, but then immediately delete/add table IDs
in batch, it looks clearer and more obvious.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]