AMashenkov commented on code in PR #2500:
URL: https://github.com/apache/ignite-3/pull/2500#discussion_r1312860379


##########
modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java:
##########
@@ -105,12 +89,12 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
     public void setUp() {
         TableManager tableManagerMock = mock(TableManager.class);
 
-        when(tableManagerMock.tableAsync(anyLong(), 
anyString())).thenAnswer(inv -> {
+        when(tableManagerMock.tableAsync(anyLong(), anyInt())).thenAnswer(inv 
-> {

Review Comment:
   ```suggestion
           when(tableManagerMock.tableAsync(anyLong(), 
anyInt())).thenAnswer(inv -> completedFuture(mockTable(inv.getArgument(1)));
   ```
   
   ```
   private static TableImpl mockTable(int tableId) {
        InternalTable tbl = mock(InternalTable.class);
   
                int tableId = inv.getArgument(1);
   
                when(tbl.tableId()).thenReturn(tableId);
   }
   ```



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java:
##########
@@ -608,6 +600,12 @@ private class Node {
         /** The future have to be complete after the node start and all Meta 
storage watches are deployd. */
         private CompletableFuture<Void> deployWatchesFut;
 
+        /** Hybrid clock. */
+        private final HybridClock clock = new HybridClockImpl();

Review Comment:
   Why do you share clock among nodes?
   
   This is considered a bug
   https://issues.apache.org/jira/browse/IGNITE-20319



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -245,37 +222,36 @@ public class TableManagerTest extends IgniteAbstractTest {
     /** Hybrid clock. */
     private final HybridClock clock = new HybridClockImpl();
 
+    /** Catalog vault. */
+    private VaultManager catalogVault;

Review Comment:
   Catalog doesn't have own Value, and no Vault implementation is backed by 
Catalog.
   ```suggestion
       private VaultManager vaultManager;
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -502,54 +465,44 @@ public TableManager(
 
         indexBuilder = new IndexBuilder(nodeName, cpus);
 
-        configuredTablesCache = new ConfiguredTablesCache(tablesCfg, 
getMetadataLocallyOnly);
-
         raftCommandsMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
 
         startVv = new IncrementalVersionedValue<>(registry);
     }
 
     @Override
     public void start() {
-        mvGc.start();
+        inBusyLock(busyLock, () -> {
+            mvGc.start();
 
-        lowWatermark.start();
+            lowWatermark.start();
 
-        fireCreateTablesOnManagerStart();
+            startTables();
 
-        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
-        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
-        
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
+            
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
+            
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
+            
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
 
-        tablesCfg.tables().listenElements(new 
ConfigurationNamedListListener<>() {
-            @Override
-            public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
-                return onTableCreate(ctx);
-            }
+            catalogService.listen(CatalogEvent.TABLE_CREATE, (parameters, 
exception) -> {
+                assert exception == null : parameters;
 
-            @Override
-            public CompletableFuture<?> 
onRename(ConfigurationNotificationEvent<TableView> ctx) {
-                // TODO: IGNITE-15485 Support table rename operation.
+                return onTableCreate((CreateTableEventParameters) 
parameters).thenApply(unused -> false);
+            });
 
-                return completedFuture(null);
-            }
+            catalogService.listen(CatalogEvent.TABLE_DROP, (parameters, 
exception) -> {
+                assert exception == null : parameters;
 
-            @Override
-            public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<TableView> ctx) {
-                return onTableDelete(ctx);
-            }
-        });
+                return onTableDelete(((DropTableEventParameters) 
parameters)).thenApply(unused -> false);
+            });
 
-        schemaManager.listen(SchemaEvent.CREATE, new EventListener<>() {
-            @Override
-            public CompletableFuture<Boolean> notify(SchemaEventParameters 
parameters, @Nullable Throwable exception) {
+            schemaManager.listen(SchemaEvent.CREATE, (parameters, exception) 
-> inBusyLockAsync(busyLock, () -> {

Review Comment:
   Seems, no one listens for SchemaEvent and TableEvent.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1233,20 +1130,77 @@ public boolean 
removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> li
         return assignmentsChangeListeners.remove(listener);
     }
 
+    /**
+     * Creates local structures for a table.
+     *
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version on which the table was created.
+     * @param tableDescriptor Catalog table descriptor.
+     * @return Future that will be completed when local changes related to the 
table creation are applied.
+     */
+    private CompletableFuture<?> createTableLocally(long causalityToken, int 
catalogVersion, CatalogTableDescriptor tableDescriptor) {
+        int tableId = tableDescriptor.id();
+
+        if (!busyLock.enterBusy()) {
+            fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableId), new NodeStoppingException());
+
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            int zoneId = tableDescriptor.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion);
+
+            CompletableFuture<List<Set<Assignment>>> assignmentsFuture;
+
+            // Check if the table already has assignments in the vault.
+            // So, it means, that it is a recovery process and we should use 
the vault assignments instead of calculation for the new ones.
+            if (partitionAssignments(vaultManager, tableId, 0) != null) {
+                assignmentsFuture = 
completedFuture(tableAssignments(vaultManager, tableId, 
zoneDescriptor.partitions()));
+            } else {
+                assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, zoneId)
+                        .thenApply(dataNodes -> 
AffinityUtils.calculateAssignments(
+                                dataNodes,
+                                zoneDescriptor.partitions(),
+                                zoneDescriptor.replicas()
+                        ));
+            }
+
+            return createTableLocally(
+                    causalityToken,
+                    tableDescriptor,
+                    zoneDescriptor,
+                    assignmentsFuture,
+                    catalogVersion
+            ).whenComplete((v, e) -> {
+                if (e == null) {
+                    for (var listener : assignmentsChangeListeners) {
+                        listener.accept(this);
+                    }
+                }
+            }).thenCompose(ignored -> 
writeTableAssignmentsToMetastore(tableId, assignmentsFuture));

Review Comment:
   Will it be the same?
   ```suggestion
               ).thenCompose((v, e) -> {
                       for (var listener : assignmentsChangeListeners) {
                           listener.accept(this);
                       }
                   return writeTableAssignmentsToMetastore(tableId, 
assignmentsFuture);
           });
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java:
##########
@@ -245,37 +222,36 @@ public class TableManagerTest extends IgniteAbstractTest {
     /** Hybrid clock. */
     private final HybridClock clock = new HybridClockImpl();
 
+    /** Catalog vault. */
+    private VaultManager catalogVault;
+
+    /** Catalog metastore. */
+    private MetaStorageManager catalogMetastore;

Review Comment:
   ```suggestion
       private MetaStorageManager metastorageManager;
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1463,408 +1416,51 @@ private void dropTableLocally(long causalityToken, 
CatalogTableDescriptor tableD
     }
 
     private Set<Assignment> calculateAssignments(TablePartitionId 
tablePartitionId) {
-        CatalogTableDescriptor tableDescriptor = 
getTableDescriptor(tablePartitionId.tableId());
+        int catalogVersion = catalogService.latestCatalogVersion();
 
-        assert tableDescriptor != null : tablePartitionId;
+        CatalogTableDescriptor tableDescriptor = 
getTableDescriptor(tablePartitionId.tableId(), catalogVersion);
 
         return AffinityUtils.calculateAssignmentForPartition(
                 // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 we 
must use distribution zone keys here
                 
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
                 tablePartitionId.partitionId(),
-                getZoneDescriptor(tableDescriptor.zoneId(), 
catalogService.latestCatalogVersion()).replicas()
+                getZoneDescriptor(tableDescriptor, catalogVersion).replicas()
         );
     }
 
-    /**
-     * Creates a new table with the given {@code name} asynchronously. If a 
table with the same name already exists, a future will be
-     * completed with {@link TableAlreadyExistsException}.
-     *
-     * @param name Table name.
-     * @param zoneName Distribution zone name.
-     * @param tableInitChange Table changer.
-     * @return Future representing pending completion of the operation.
-     * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
-     *         <ul>
-     *             <li>the node is stopping.</li>
-     *         </ul>
-     * @see TableAlreadyExistsException
-     */
-    public CompletableFuture<Table> createTableAsync(String name, String 
zoneName, Consumer<TableChange> tableInitChange) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return createTableAsyncInternal(name, zoneName, tableInitChange);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /** See {@link #createTableAsync(String, String, Consumer)} for details. */
-    private CompletableFuture<Table> createTableAsyncInternal(
-            String name,
-            String zoneName,
-            Consumer<TableChange> tableInitChange
-    ) {
-        CompletableFuture<Table> tblFut = new CompletableFuture<>();
-
-        tableAsyncInternal(name)
-                .handle((tbl, tblEx) -> {
-                    if (tbl != null) {
-                        tblFut.completeExceptionally(new 
TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, name));
-                    } else if (tblEx != null) {
-                        tblFut.completeExceptionally(tblEx);
-                    } else {
-                        if (!busyLock.enterBusy()) {
-                            NodeStoppingException nodeStoppingException = new 
NodeStoppingException();
-
-                            
tblFut.completeExceptionally(nodeStoppingException);
-
-                            throw new IgniteException(nodeStoppingException);
-                        }
-
-                        try {
-                            // TODO: IGNITE-19499 Should listen to event 
CreateTableEventParameters and get the zone ID from it
-                            CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(zoneName, clock.nowLong());
-
-                            if (zoneDescriptor == null) {
-                                tblFut.completeExceptionally(new 
DistributionZoneNotFoundException(zoneName));
-
-                                return null;
-                            }
-
-                            cmgMgr.logicalTopology()
-                                    .handle((cmgTopology, e) -> {
-                                        if (e == null) {
-                                            if (!busyLock.enterBusy()) {
-                                                NodeStoppingException 
nodeStoppingException = new NodeStoppingException();
-
-                                                
tblFut.completeExceptionally(nodeStoppingException);
-
-                                                throw new 
IgniteException(nodeStoppingException);
-                                            }
-
-                                            try {
-                                                
changeTablesConfigurationOnTableCreate(
-                                                        name,
-                                                        zoneDescriptor.id(),
-                                                        tableInitChange,
-                                                        tblFut
-                                                );
-                                            } finally {
-                                                busyLock.leaveBusy();
-                                            }
-                                        } else {
-                                            tblFut.completeExceptionally(e);
-                                        }
-
-                                        return null;
-                                    });
-                        } catch (Throwable t) {
-                            tblFut.completeExceptionally(t);
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
-                    }
-
-                    return null;
-                });
-
-        return tblFut;
-    }
-
-    /**
-     * Creates a new table in {@link TablesConfiguration}.
-     *
-     * @param name Table name.
-     * @param zoneId Distribution zone id.
-     * @param tableInitChange Table changer.
-     * @param tblFut Future representing pending completion of the table 
creation.
-     */
-    private void changeTablesConfigurationOnTableCreate(
-            String name,
-            int zoneId,
-            Consumer<TableChange> tableInitChange,
-            CompletableFuture<Table> tblFut
-    ) {
-        tablesCfg.change(tablesChange -> {
-            incrementTablesGeneration(tablesChange);
-
-            tablesChange.changeTables(tablesListChange -> {
-                if (tablesListChange.get(name) != null) {
-                    throw new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME, 
name);
-                }
-
-                tablesListChange.create(name, (tableChange) -> {
-                    tableInitChange.accept(tableChange);
-
-                    tableChange.changeZoneId(zoneId);
-
-                    var extConfCh = ((ExtendedTableChange) tableChange);
-
-                    int tableId = tablesChange.globalIdCounter() + 1;
-
-                    extConfCh.changeId(tableId);
-
-                    tablesChange.changeGlobalIdCounter(tableId);
-
-                    extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION);
-
-                    tableCreateFuts.put(extConfCh.id(), tblFut);
-                });
-            });
-        }).exceptionally(t -> {
-            Throwable ex = getRootCause(t);
-
-            if (ex instanceof TableAlreadyExistsException) {
-                tblFut.completeExceptionally(ex);
-            } else {
-                LOG.debug("Unable to create table [name={}]", ex, name);
-
-                tblFut.completeExceptionally(ex);
-
-                tableCreateFuts.values().removeIf(fut -> fut == tblFut);
-            }
-
-            return null;
-        });
-    }
-
-    private static void incrementTablesGeneration(TablesChange tablesChange) {
-        tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 
1);
-    }
-
-    /**
-     * Alters a cluster table. If an appropriate table does not exist, a 
future will be completed with {@link TableNotFoundException}.
-     *
-     * @param name Table name.
-     * @param tableChange Table changer.
-     * @return Future representing pending completion of the operation.
-     * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
-     *         <ul>
-     *             <li>the node is stopping.</li>
-     *         </ul>
-     * @see TableNotFoundException
-     */
-    public CompletableFuture<Void> alterTableAsync(String name, 
Function<TableChange, Boolean> tableChange) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return alterTableAsyncInternal(name, tableChange);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /** See {@link #alterTableAsync(String, Function)} for details. */
-    private CompletableFuture<Void> alterTableAsyncInternal(String name, 
Function<TableChange, Boolean> tableChange) {
-        CompletableFuture<Void> tblFut = new CompletableFuture<>();
-
-        tableAsync(name).thenAccept(tbl -> {
-            if (tbl == null) {
-                tblFut.completeExceptionally(new 
TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
-            } else {
-                tablesCfg.tables().change(ch -> {
-                    if (ch.get(name) == null) {
-                        throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, 
name);
-                    }
-
-                    ch.update(name, tblCh -> {
-                        if (!tableChange.apply(tblCh)) {
-                            return;
-                        }
-
-                        ExtendedTableChange exTblChange = 
(ExtendedTableChange) tblCh;
-
-                        exTblChange.changeSchemaId(exTblChange.schemaId() + 1);
-                    });
-                }).whenComplete((res, t) -> {
-                    if (t != null) {
-                        Throwable ex = getRootCause(t);
-
-                        if (ex instanceof TableNotFoundException) {
-                            tblFut.completeExceptionally(ex);
-                        } else {
-                            LOG.debug("Unable to modify table [name={}]", ex, 
name);
-
-                            tblFut.completeExceptionally(ex);
-                        }
-                    } else {
-                        tblFut.complete(res);
-                    }
-                });
-            }
-        }).exceptionally(th -> {
-            tblFut.completeExceptionally(th);
-
-            return null;
-        });
-
-        return tblFut;
-    }
-
-    /**
-     * Gets a cause exception for a client.
-     *
-     * @param t Exception wrapper.
-     * @return A root exception which will be acceptable to throw for public 
API.
-     */
-    //TODO: IGNITE-16051 Implement exception converter for public API.
-    private IgniteException getRootCause(Throwable t) {
-        Throwable ex;
-
-        if (t instanceof CompletionException) {
-            if (t.getCause() instanceof ConfigurationChangeException) {
-                ex = t.getCause().getCause();
-            } else {
-                ex = t.getCause();
-            }
-
-        } else {
-            ex = t;
-        }
-
-        // TODO https://issues.apache.org/jira/browse/IGNITE-19539
-        return (ex instanceof IgniteException) ? (IgniteException) ex : 
ExceptionUtils.wrap(ex);
-    }
-
-    /**
-     * Drops a table with the name specified. If appropriate table does not be 
found, a future will be completed with
-     * {@link TableNotFoundException}.
-     *
-     * @param name Table name.
-     * @return Future representing pending completion of the operation.
-     * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
-     *         <ul>
-     *             <li>the node is stopping.</li>
-     *         </ul>
-     * @see TableNotFoundException
-     */
-    public CompletableFuture<Void> dropTableAsync(String name) {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return dropTableAsyncInternal(name);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /** See {@link #dropTableAsync(String)} for details. */
-    private CompletableFuture<Void> dropTableAsyncInternal(String name) {
-        return tableAsyncInternal(name).thenCompose(tbl -> {
-            // In case of drop it's an optimization that allows not to fire 
drop-change-closure if there's no such
-            // distributed table and the local config has lagged behind.
-            if (tbl == null) {
-                return failedFuture(new 
TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
-            }
-
-            return tablesCfg
-                    .change(chg -> {
-                        incrementTablesGeneration(chg);
-
-                        chg
-                                .changeTables(tblChg -> {
-                                    if (tblChg.get(name) == null) {
-                                        throw new 
TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
-                                    }
-
-                                    tblChg.delete(name);
-                                });
-                    })
-                    .exceptionally(t -> {
-                        Throwable ex = getRootCause(t);
-
-                        if (!(ex instanceof TableNotFoundException)) {
-                            LOG.debug("Unable to drop table [name={}]", ex, 
name);
-                        }
-
-                        throw new CompletionException(ex);
-                    });
-        });
-    }
-
-    /** {@inheritDoc} */
     @Override
     public List<Table> tables() {
         return join(tablesAsync());
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<List<Table>> tablesAsync() {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
-        }
-        try {
-            return tablesAsyncInternal();
-        } finally {
-            busyLock.leaveBusy();
-        }
+        return inBusyLockAsync(busyLock, this::tablesAsyncInternalBusy);
     }
 
-    /**
-     * Internal method for getting table.
-     *
-     * @return Future representing pending completion of the operation.
-     */
-    private CompletableFuture<List<Table>> tablesAsyncInternal() {
-        return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), 
ioExecutor)
-                .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
-                    var tableFuts = new CompletableFuture[tableIds.size()];
-
-                    var i = 0;
+    private CompletableFuture<List<Table>> tablesAsyncInternalBusy() {
+        HybridTimestamp now = clock.now();
 
-                    for (int tblId : tableIds) {
-                        tableFuts[i++] = tableAsyncInternal(tblId, false);
-                    }
-
-                    return allOf(tableFuts).thenApply(unused -> 
inBusyLock(busyLock, () -> {
-                        var tables = new ArrayList<Table>(tableIds.size());
-
-                        for (var fut : tableFuts) {
-                            var table = fut.join();
-
-                            if (table != null) {
-                                tables.add((Table) table);
-                            }
-                        }
+        return anyOf(schemaSyncService.waitForMetadataCompleteness(now), 
stopManagerFuture)

Review Comment:
   This looks like a hack.
   
   I believe, SchemaSyncService should be responsible cancelling futures, which 
it creates.
   And it does, because uses `ClusterTime`, which implementation cleanup 
resources on `close()`.
   
   So, I guess the real issue is that we do `future.join()` under busylock.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to