tkalkirill commented on code in PR #2500: URL: https://github.com/apache/ignite-3/pull/2500#discussion_r1319348640
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -1233,20 +1130,77 @@ public boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li return assignmentsChangeListeners.remove(listener); } + /** + * Creates local structures for a table. + * + * @param causalityToken Causality token. + * @param catalogVersion Catalog version on which the table was created. + * @param tableDescriptor Catalog table descriptor. + * @return Future that will be completed when local changes related to the table creation are applied. + */ + private CompletableFuture<?> createTableLocally(long causalityToken, int catalogVersion, CatalogTableDescriptor tableDescriptor) { + int tableId = tableDescriptor.id(); + + if (!busyLock.enterBusy()) { + fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId), new NodeStoppingException()); + + return failedFuture(new NodeStoppingException()); + } + + try { + int zoneId = tableDescriptor.zoneId(); + + CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion); + + CompletableFuture<List<Set<Assignment>>> assignmentsFuture; + + // Check if the table already has assignments in the vault. + // So, it means, that it is a recovery process and we should use the vault assignments instead of calculation for the new ones. + if (partitionAssignments(vaultManager, tableId, 0) != null) { + assignmentsFuture = completedFuture(tableAssignments(vaultManager, tableId, zoneDescriptor.partitions())); + } else { + assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, zoneId) + .thenApply(dataNodes -> AffinityUtils.calculateAssignments( + dataNodes, + zoneDescriptor.partitions(), + zoneDescriptor.replicas() + )); + } + + return createTableLocally( + causalityToken, + tableDescriptor, + zoneDescriptor, + assignmentsFuture, + catalogVersion + ).whenComplete((v, e) -> { + if (e == null) { + for (var listener : assignmentsChangeListeners) { + listener.accept(this); + } + } + }).thenCompose(ignored -> writeTableAssignmentsToMetastore(tableId, assignmentsFuture)); Review Comment: I see that listeners are used in `org.apache.ignite.client.handler.ClientInboundMessageHandler`. I will create a separate ticket for this problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org