This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 90894e3651 IGNITE-21609 Avoid using versioned values for non-versioned 
objects (#3310)
90894e3651 is described below

commit 90894e365195cbfe3029ee584ef41a5480e6e649
Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com>
AuthorDate: Mon Mar 4 16:55:38 2024 +0300

    IGNITE-21609 Avoid using versioned values for non-versioned objects (#3310)
---
 .../apache/ignite/internal/index/IndexManager.java |  66 ++-----
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 .../ignite/internal/schema/SchemaManager.java      | 101 +++++------
 .../ignite/internal/schema/SchemaManagerTest.java  |   4 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   8 +-
 .../internal/table/distributed/TableManager.java   | 192 ++++++++-------------
 .../table/distributed/TableManagerTest.java        |   2 +-
 8 files changed, 136 insertions(+), 243 deletions(-)

diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index bd485017d0..069161857f 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -25,14 +25,13 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
@@ -106,8 +105,11 @@ public class IndexManager implements IgniteComponent {
     /** Versioned value used only at the start of the manager. */
     private final IncrementalVersionedValue<Void> startVv;
 
-    /** Version value of multi-version table storages by ID for which indexes 
were created. */
-    private final IncrementalVersionedValue<Int2ObjectMap<MvTableStorage>> 
mvTableStoragesByIdVv;
+    /** Versioned value for linearizing index partition changing events. */
+    private final IncrementalVersionedValue<Void> tableStoragesVv;
+
+    /** Table storages by ID for which indexes were created. */
+    private final Map<Integer, MvTableStorage> tableStoragesById = new 
ConcurrentHashMap<>();
 
     /**
      * Constructor.
@@ -132,7 +134,7 @@ public class IndexManager implements IgniteComponent {
         this.ioExecutor = ioExecutor;
 
         startVv = new IncrementalVersionedValue<>(registry);
-        mvTableStoragesByIdVv = new IncrementalVersionedValue<>(registry, 
Int2ObjectMaps::emptyMap);
+        tableStoragesVv = new IncrementalVersionedValue<>(registry);
     }
 
     @Override
@@ -179,7 +181,7 @@ public class IndexManager implements IgniteComponent {
      *      parameters.
      */
     CompletableFuture<MvTableStorage> getMvTableStorage(long causalityToken, 
int tableId) {
-        return 
mvTableStoragesByIdVv.get(causalityToken).thenApply(mvTableStoragesById -> 
mvTableStoragesById.get(tableId));
+        return tableStoragesVv.get(causalityToken).thenApply(ignore -> 
tableStoragesById.get(tableId));
     }
 
     private CompletableFuture<Boolean> 
onIndexDestroy(DestroyIndexEventParameters parameters) {
@@ -190,16 +192,14 @@ public class IndexManager implements IgniteComponent {
 
         CompletableFuture<TableViewInternal> tableFuture = 
tableManager.tableAsync(causalityToken, tableId);
 
-        return inBusyLockAsync(busyLock, () -> mvTableStoragesByIdVv.update(
+        return inBusyLockAsync(busyLock, () -> tableStoragesVv.update(
                 causalityToken,
-                updater(mvTableStorageById -> tableFuture.thenApply(table -> 
inBusyLock(busyLock, () -> {
+                updater(ignore -> tableFuture.thenAccept(table -> 
inBusyLock(busyLock, () -> {
                     if (table != null) {
                         // In case of DROP TABLE the table will be removed 
first.
                         table.unregisterIndex(indexId);
-
-                        return mvTableStorageById;
                     } else {
-                        return 
removeMvTableStorageIfPresent(mvTableStorageById, tableId);
+                        tableStoragesById.remove(tableId);
                     }
                 })))
         )).thenApply(unused -> false);
@@ -349,13 +349,17 @@ public class IndexManager implements IgniteComponent {
 
         CompletableFuture<SchemaRegistry> schemaRegistryFuture = 
schemaManager.schemaRegistry(causalityToken, tableId);
 
-        return mvTableStoragesByIdVv.update(
+        return tableStoragesVv.update(
                 causalityToken,
-                updater(mvTableStorageById -> 
tablePartitionFuture.thenCombineAsync(schemaRegistryFuture,
+                updater(ignore -> 
tablePartitionFuture.thenCombineAsync(schemaRegistryFuture,
                         (partitionSet, schemaRegistry) -> inBusyLock(busyLock, 
() -> {
                             registerIndex(table, index, partitionSet, 
schemaRegistry);
 
-                            return 
addMvTableStorageIfAbsent(mvTableStorageById, 
getTableViewStrict(tableId).internalTable().storage());
+                            MvTableStorage storage = 
getTableViewStrict(tableId).internalTable().storage();
+
+                            tableStoragesById.putIfAbsent(tableId, storage);
+
+                            return null;
                         }), ioExecutor))
         );
     }
@@ -393,38 +397,6 @@ public class IndexManager implements IgniteComponent {
         }
     }
 
-    private static Int2ObjectMap<MvTableStorage> addMvTableStorageIfAbsent(
-            Int2ObjectMap<MvTableStorage> mvTableStorageById,
-            MvTableStorage mvTableStorage
-    ) {
-        int tableId = mvTableStorage.getTableDescriptor().getId();
-
-        if (mvTableStorageById.containsKey(tableId)) {
-            return mvTableStorageById;
-        }
-
-        Int2ObjectMap<MvTableStorage> newMap = new 
Int2ObjectOpenHashMap<>(mvTableStorageById);
-
-        newMap.put(tableId, mvTableStorage);
-
-        return newMap;
-    }
-
-    private static Int2ObjectMap<MvTableStorage> removeMvTableStorageIfPresent(
-            Int2ObjectMap<MvTableStorage> mvTableStorageById,
-            int tableId
-    ) {
-        if (!mvTableStorageById.containsKey(tableId)) {
-            return mvTableStorageById;
-        }
-
-        Int2ObjectMap<MvTableStorage> newMap = new 
Int2ObjectOpenHashMap<>(mvTableStorageById);
-
-        newMap.remove(tableId);
-
-        return newMap;
-    }
-
     private TableViewInternal getTableViewStrict(int tableId) {
         TableViewInternal table = tableManager.cachedTable(tableId);
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 4dde7bce06..f083095297 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -496,7 +496,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 partitionIdleSafeTimePropagationPeriodMsSupplier
         );
 
-        SchemaManager schemaManager = new SchemaManager(registry, 
catalogManager, metaStorageMgr);
+        SchemaManager schemaManager = new SchemaManager(registry, 
catalogManager);
 
         var dataNodesMock = dataNodesMockByNode.get(idx);
 
@@ -1069,7 +1069,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      * @param tableName Table name.
      */
     private void assertTablePresent(TableManager tableManager, String 
tableName) {
-        Collection<TableImpl> tables = tableManager.latestTables().values();
+        Collection<TableImpl> tables = tableManager.startedTables().values();
 
         boolean isPresent = false;
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d1103775b4..5dc2b7ea5f 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -641,7 +641,7 @@ public class IgniteImpl implements Ignite {
 
         SchemaSyncService schemaSyncService = new 
SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
 
-        schemaManager = new SchemaManager(registry, catalogManager, 
metaStorageMgr);
+        schemaManager = new SchemaManager(registry, catalogManager);
 
         ScheduledExecutorService rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
                 NamedThreadFactory.create(name, "rebalance-scheduler", LOG));
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 5764bcc189..3e9a6f7251 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.schema;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -26,9 +25,9 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
@@ -45,7 +44,6 @@ import 
org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.schema.catalog.CatalogToSchemaDescriptorConverter;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -65,21 +63,16 @@ public class SchemaManager implements IgniteComponent {
 
     private final CatalogService catalogService;
 
-    /** Versioned store for tables by ID. */
-    private final IncrementalVersionedValue<Map<Integer, SchemaRegistryImpl>> 
registriesVv;
+    /** Versioned value for linearizing index partition changing events. */
+    private final IncrementalVersionedValue<Void> registriesVv;
 
-    /** Meta storage manager. */
-    private final MetaStorageManager metastorageMgr;
+    /** Schema registries by table ID. */
+    private final Map<Integer, SchemaRegistryImpl> registriesById = new 
ConcurrentHashMap<>();
 
     /** Constructor. */
-    public SchemaManager(
-            Consumer<LongFunction<CompletableFuture<?>>> registry,
-            CatalogService catalogService,
-            MetaStorageManager metastorageMgr
-    ) {
-        this.registriesVv = new IncrementalVersionedValue<>(registry, 
HashMap::new);
+    public SchemaManager(Consumer<LongFunction<CompletableFuture<?>>> 
registry, CatalogService catalogService) {
+        this.registriesVv = new IncrementalVersionedValue<>(registry);
         this.catalogService = catalogService;
-        this.metastorageMgr = metastorageMgr;
     }
 
     @Override
@@ -94,39 +87,30 @@ public class SchemaManager implements IgniteComponent {
     }
 
     private void registerExistingTables() {
-        CompletableFuture<Long> recoveryFinishFuture = 
metastorageMgr.recoveryFinishedFuture();
-
-        assert recoveryFinishFuture.isDone();
-
-        long causalityToken = recoveryFinishFuture.join();
-
         for (int catalogVer = catalogService.latestCatalogVersion(); 
catalogVer >= catalogService.earliestCatalogVersion(); catalogVer--) {
             Collection<CatalogTableDescriptor> tables = 
catalogService.tables(catalogVer);
 
-            registriesVv.update(causalityToken, (registries, throwable) -> {
-                for (CatalogTableDescriptor tableDescriptor : tables) {
-                    int tableId = tableDescriptor.id();
+            for (CatalogTableDescriptor tableDescriptor : tables) {
+                int tableId = tableDescriptor.id();
 
-                    if (registries.containsKey(tableId)) {
-                        continue;
-                    }
-
-                    SchemaDescriptor prevSchema = null;
-                    CatalogTableSchemaVersions schemaVersions = 
tableDescriptor.schemaVersions();
-                    for (int tableVer = schemaVersions.earliestVersion(); 
tableVer <= schemaVersions.latestVersion(); tableVer++) {
-                        SchemaDescriptor newSchema = 
CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableVer);
+                if (registriesById.containsKey(tableId)) {
+                    continue;
+                }
 
-                        if (prevSchema != null) {
-                            
newSchema.columnMapping(SchemaUtils.columnMapper(prevSchema, newSchema));
-                        }
+                SchemaDescriptor prevSchema = null;
+                CatalogTableSchemaVersions schemaVersions = 
tableDescriptor.schemaVersions();
+                for (int tableVer = schemaVersions.earliestVersion(); tableVer 
<= schemaVersions.latestVersion(); tableVer++) {
+                    SchemaDescriptor newSchema = 
CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableVer);
 
-                        prevSchema = newSchema;
-                        registries = registerSchema(registries, tableId, 
newSchema);
+                    if (prevSchema != null) {
+                        
newSchema.columnMapping(SchemaUtils.columnMapper(prevSchema, newSchema));
                     }
-                }
 
-                return completedFuture(registries);
-            });
+                    prevSchema = newSchema;
+
+                    registerSchema(tableId, newSchema);
+                }
+            }
         }
     }
 
@@ -180,7 +164,9 @@ public class SchemaManager implements IgniteComponent {
                     );
                 }
 
-                return completedFuture(registerSchema(registries, tableId, 
newSchema));
+                registerSchema(tableId, newSchema);
+
+                return nullCompletedFuture();
             })).thenApply(ignored -> false);
         } finally {
             busyLock.leaveBusy();
@@ -239,29 +225,22 @@ public class SchemaManager implements IgniteComponent {
     /**
      * Registers the new schema in the registries.
      *
-     * @param registries Registries before registering this schema.
      * @param tableId ID of the table to which the schema belongs.
      * @param schema The schema to register.
-     * @return Registries after registering this schema.
      */
-    private Map<Integer, SchemaRegistryImpl> registerSchema(
-            Map<Integer, SchemaRegistryImpl> registries,
+    private void registerSchema(
             int tableId,
             SchemaDescriptor schema
     ) {
-        SchemaRegistryImpl reg = registries.get(tableId);
-
-        if (reg == null) {
-            Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
-
-            copy.put(tableId, createSchemaRegistry(tableId, schema));
+        registriesById.compute(tableId, (tableId0, reg) -> {
+            if (reg == null) {
+                return createSchemaRegistry(tableId0, schema);
+            }
 
-            return copy;
-        } else {
             reg.onSchemaRegistered(schema);
 
-            return registries;
-        }
+            return reg;
+        });
     }
 
     /**
@@ -286,7 +265,7 @@ public class SchemaManager implements IgniteComponent {
      * @return Descriptor if required schema found, or {@code null} otherwise.
      */
     private @Nullable SchemaDescriptor searchSchemaByVersion(int tblId, int 
schemaVer) {
-        SchemaRegistry registry = registriesVv.latest().get(tblId);
+        SchemaRegistry registry = registriesById.get(tblId);
 
         if (registry != null && schemaVer <= 
registry.lastKnownSchemaVersion()) {
             return registry.schema(schemaVer);
@@ -311,7 +290,7 @@ public class SchemaManager implements IgniteComponent {
 
         try {
             return registriesVv.get(causalityToken)
-                    .thenApply(regs -> inBusyLock(busyLock, () -> 
regs.get(tableId)));
+                    .thenApply(unused -> inBusyLock(busyLock, () -> 
registriesById.get(tableId)));
         } finally {
             busyLock.leaveBusy();
         }
@@ -324,7 +303,7 @@ public class SchemaManager implements IgniteComponent {
      * @return Schema registry.
      */
     public SchemaRegistry schemaRegistry(int tableId) {
-        return registriesVv.latest().get(tableId);
+        return registriesById.get(tableId);
     }
 
     /**
@@ -345,12 +324,10 @@ public class SchemaManager implements IgniteComponent {
                             format("Cannot remove a schema registry for the 
table [tblId={}]", tableId), e));
                 }
 
-                Map<Integer, SchemaRegistryImpl> regs = new 
HashMap<>(registries);
-
-                SchemaRegistryImpl removedRegistry = regs.remove(tableId);
+                SchemaRegistryImpl removedRegistry = 
registriesById.remove(tableId);
                 removedRegistry.close();
 
-                return completedFuture(regs);
+                return nullCompletedFuture();
             }));
         } finally {
             busyLock.leaveBusy();
@@ -366,6 +343,6 @@ public class SchemaManager implements IgniteComponent {
         busyLock.block();
 
         // noinspection ConstantConditions
-        IgniteUtils.closeAllManually(registriesVv.latest().values());
+        IgniteUtils.closeAllManually(registriesById.values());
     }
 }
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
index 83013878a5..16a19ec9e9 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
@@ -105,7 +105,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
         doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_ALTER), 
tableAlteredListener.capture());
         
doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_DESTROY), 
tableDestroyedListener.capture());
 
-        schemaManager = new SchemaManager(registry, catalogService, 
metaStorageManager);
+        schemaManager = new SchemaManager(registry, catalogService);
         schemaManager.start();
 
         assertThat("Watches were not deployed", 
metaStorageManager.deployWatches(), willCompleteSuccessfully());
@@ -281,7 +281,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
         
when(catalogService.tables(anyInt())).thenReturn(List.of(tableDescriptorAfterColumnAddition()));
         
doReturn(CompletableFuture.completedFuture(CAUSALITY_TOKEN_2)).when(metaStorageManager).recoveryFinishedFuture();
 
-        schemaManager = new SchemaManager(registry, catalogService, 
metaStorageManager);
+        schemaManager = new SchemaManager(registry, catalogService);
         schemaManager.start();
 
         completeCausalityToken(CAUSALITY_TOKEN_2);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 6be1f738c4..6c25e6a32f 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -683,7 +683,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
         assertTrue(waitForCondition(
                 () -> nodes.stream().allMatch(n ->
                         n.tableManager
-                                .latestTables()
+                                .startedTables()
                                 .get(getTableId(node, TABLE_NAME))
                                 .internalTable()
                                 .tableRaftService()
@@ -739,7 +739,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
         assertTrue(waitForCondition(
                 () -> nodes.stream().allMatch(n ->
                         n.tableManager
-                                .latestTables()
+                                .startedTables()
                                 .get(getTableId(node, TABLE_NAME))
                                 .internalTable()
                                 .tableRaftService()
@@ -824,7 +824,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     try {
                         return nodes.stream().allMatch(n ->
                                 n.tableManager
-                                        .latestTables()
+                                        .startedTables()
                                         .get(getTableId(n, tableName))
                                         .internalTable()
                                         .tableRaftService()
@@ -1153,7 +1153,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     partitionIdleSafeTimePropagationPeriodMsSupplier
             );
 
-            schemaManager = new SchemaManager(registry, catalogManager, 
metaStorageManager);
+            schemaManager = new SchemaManager(registry, catalogManager);
 
             schemaSyncService = new 
SchemaSyncServiceImpl(metaStorageManager.clusterTime(), 
delayDurationMsSupplier);
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3df5f576f1..5b1820fc91 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table.distributed;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableMap;
 import static java.util.concurrent.CompletableFuture.allOf;
@@ -69,7 +68,6 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -261,38 +259,38 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     private final TransactionStateResolver transactionStateResolver;
 
     /**
-     * Versioned store for tables by id. Only table instances are created 
here, local storages and RAFT groups may not be initialized yet.
+     * Versioned value for linearizing table changing events.
      *
-     * @see #localPartsByTableIdVv
+     * @see #localPartitionsVv
      * @see #assignmentsUpdatedVv
      */
-    private final IncrementalVersionedValue<Map<Integer, TableImpl>> 
tablesByIdVv;
+    private final IncrementalVersionedValue<Void> tablesVv;
 
     /**
-     * Versioned store for local partition set by table id.
+     * Versioned value for linearizing table partitions changing events.
      *
-     * <p>Completed strictly after {@link #tablesByIdVv} and strictly before 
{@link #assignmentsUpdatedVv}.
+     * <p>Completed strictly after {@link #tablesVv} and strictly before 
{@link #assignmentsUpdatedVv}.
      */
-    private final IncrementalVersionedValue<Map<Integer, PartitionSet>> 
localPartsByTableIdVv;
+    private final IncrementalVersionedValue<Void> localPartitionsVv;
 
     /**
-     * Versioned store for tracking RAFT groups initialization and starting 
completion.
+     * Versioned value for tracking RAFT groups initialization and starting 
completion.
      *
      * <p>Only explicitly updated in {@link 
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int)}.
      *
-     * <p>Completed strictly after {@link #localPartsByTableIdVv}.
+     * <p>Completed strictly after {@link #localPartitionsVv}.
      */
     private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
 
-    /**
-     * {@link TableImpl} is created during update of tablesByIdVv, we store 
reference to it in case of updating of tablesByIdVv fails, so we
-     * can stop resources associated with the table or to clean up table 
resources on {@code TableManager#stop()}.
-     */
-    private final Map<Integer, TableImpl> pendingTables = new 
ConcurrentHashMap<>();
+    /** Registered tables. */
+    private final Map<Integer, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Started tables. */
     private final Map<Integer, TableImpl> startedTables = new 
ConcurrentHashMap<>();
 
+    /** Local partitions. */
+    private final Map<Integer, PartitionSet> localPartsByTableId = new 
ConcurrentHashMap<>();
+
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -488,11 +486,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         schemaVersions = new 
SchemaVersionsImpl(executorInclinedSchemaSyncService, catalogService, clock);
 
-        tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new);
+        tablesVv = new IncrementalVersionedValue<>(registry);
 
-        localPartsByTableIdVv = new 
IncrementalVersionedValue<>(dependingOn(tablesByIdVv), HashMap::new);
+        localPartitionsVv = new 
IncrementalVersionedValue<>(dependingOn(tablesVv));
 
-        assignmentsUpdatedVv = new 
IncrementalVersionedValue<>(dependingOn(localPartsByTableIdVv));
+        assignmentsUpdatedVv = new 
IncrementalVersionedValue<>(dependingOn(localPartitionsVv));
 
         txStateStorageScheduledPool = 
Executors.newSingleThreadScheduledExecutor(
                 NamedThreadFactory.create(nodeName, 
"tx-state-storage-scheduled-pool", LOG));
@@ -528,7 +526,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 metaStorageMgr,
                 messagingService,
                 topologyService,
-                tableId -> latestTablesById().get(tableId)
+                tableId -> tablesById().get(tableId)
         );
 
         startVv = new IncrementalVersionedValue<>(registry);
@@ -726,19 +724,19 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     private CompletableFuture<?> onTableRename(RenameTableEventParameters 
parameters) {
-        return inBusyLockAsync(busyLock, () -> tablesByIdVv.update(
+        return inBusyLockAsync(busyLock, () -> tablesVv.update(
                 parameters.causalityToken(),
-                (tablesById, e) -> {
+                (ignore, e) -> {
                     if (e != null) {
                         return failedFuture(e);
                     }
 
-                    TableImpl table = tablesById.get(parameters.tableId());
+                    TableImpl table = tables.get(parameters.tableId());
 
                     // TODO: revisit this approach, see 
https://issues.apache.org/jira/browse/IGNITE-21235.
                     table.name(parameters.newTableName());
 
-                    return completedFuture(tablesById);
+                    return nullCompletedFuture();
                 })
         );
     }
@@ -1074,12 +1072,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         lowWatermark.removeUpdateListener(mvGc);
 
-        var tablesToStop = new HashMap<Integer, TableImpl>();
-
-        tablesToStop.putAll(latestTablesById());
-        tablesToStop.putAll(pendingTables);
-
-        cleanUpTablesResources(tablesToStop);
+        cleanUpTablesResources(tables);
     }
 
     @Override
@@ -1249,26 +1242,17 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         var table = new TableImpl(internalTable, lockMgr, schemaVersions, 
marshallers, sql.get(), tableDescriptor.primaryKeyIndexId());
 
-        tablesByIdVv.update(causalityToken, (previous, e) -> 
inBusyLock(busyLock, () -> {
+        tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
             if (e != null) {
                 return failedFuture(e);
             }
 
-            return schemaManager.schemaRegistry(causalityToken, tableId)
-                    .thenApply(schema -> {
-                        table.schemaView(schema);
-
-                        var val = new HashMap<>(previous);
-
-                        val.put(tableId, table);
-
-                        return val;
-                    });
+            return schemaManager.schemaRegistry(causalityToken, 
tableId).thenAccept(table::schemaView);
         }));
 
         // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
-        CompletableFuture<?> localPartsUpdateFuture = 
localPartsByTableIdVv.update(causalityToken,
-                (previous, throwable) -> inBusyLock(busyLock, () -> 
assignmentsFuture.thenComposeAsync(newAssignments -> {
+        CompletableFuture<?> localPartsUpdateFuture = 
localPartitionsVv.update(causalityToken,
+                (ignore, throwable) -> inBusyLock(busyLock, () -> 
assignmentsFuture.thenComposeAsync(newAssignments -> {
                     PartitionSet parts = new BitSetPartitionSet();
 
                     // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set 
partitions only for
@@ -1277,39 +1261,28 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                         parts.set(i);
                     }
 
-                    return getOrCreatePartitionStorages(table, 
parts).thenApply(u -> {
-                        var newValue = new HashMap<>(previous);
-
-                        newValue.put(tableId, parts);
-
-                        return newValue;
-                    });
+                    return getOrCreatePartitionStorages(table, 
parts).thenAccept(u -> localPartsByTableId.put(tableId, parts));
                 }, ioExecutor)));
 
-        // We bring the future outside to avoid OutdatedTokenException.
-        CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = 
tablesByIdVv.get(causalityToken);
+        CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions 
should be started only on the assignments change
-        // TODO event triggered by zone create or alter.
+        //  event triggered by zone create or alter.
         CompletableFuture<?> createPartsFut = 
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
             if (e != null) {
                 return failedFuture(e);
             }
 
-            return localPartsUpdateFuture.thenCompose(unused ->
-                    tablesByIdFuture.thenComposeAsync(tablesById -> inBusyLock(
+            startedTables.put(tableId, table);
+
+            return allOf(localPartsUpdateFuture, tablesByIdFuture)
+                    .thenComposeAsync(ignore -> inBusyLock(
                             busyLock,
                             () -> 
startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id())
-                    ), ioExecutor)
-            );
+                    ), ioExecutor);
         });
 
-        pendingTables.put(tableId, table);
-        startedTables.put(tableId, table);
-
-        tablesById(causalityToken).thenAccept(ignored -> inBusyLock(busyLock, 
() -> {
-            pendingTables.remove(tableId);
-        }));
+        tables.put(tableId, table);
 
         // TODO should be reworked in IGNITE-16763
 
@@ -1381,27 +1354,25 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      */
     private void dropTableLocally(long causalityToken, 
DestroyTableEventParameters parameters) {
         int tableId = parameters.tableId();
+        // TODO Drop partitions from parameters and use from storage.
         int partitions = parameters.partitions();
 
-        localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
+        localPartitionsVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
             if (e != null) {
                 return failedFuture(e);
             }
 
-            var newMap = new HashMap<>(previousVal);
-            newMap.remove(tableId);
+            localPartsByTableId.remove(tableId);
 
-            return completedFuture(newMap);
+            return nullCompletedFuture();
         }));
 
-        tablesByIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
+        tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
             if (e != null) {
                 return failedFuture(e);
             }
 
-            var map = new HashMap<>(previousVal);
-
-            TableImpl table = map.remove(tableId);
+            TableImpl table = tables.get(tableId);
 
             assert table != null : tableId;
 
@@ -1410,7 +1381,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             CompletableFuture<?>[] stopReplicaFutures = new 
CompletableFuture<?>[partitions];
 
             // TODO https://issues.apache.org/jira/browse/IGNITE-19170 
Partitions should be stopped on the assignments change
-            // TODO event triggered by zone drop or alter.
+            //  event triggered by zone drop or alter. Stop replica 
asynchronously, out of metastorage event pipeline.
             for (int partitionId = 0; partitionId < partitions; partitionId++) 
{
                 var replicationGroupId = new TablePartitionId(tableId, 
partitionId);
 
@@ -1425,7 +1396,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                     runAsync(() -> 
internalTable.txStateStorage().destroy(), ioExecutor)
                             ),
                             ioExecutor
-                    ).thenApply(v -> map);
+                    ).thenAccept(ignore0 -> tables.remove(tableId));
         }));
 
         startedTables.remove(tableId);
@@ -1478,37 +1449,22 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @see #assignmentsUpdatedVv
      */
     private CompletableFuture<Map<Integer, TableImpl>> tablesById(long 
causalityToken) {
-        // We bring the future outside to avoid OutdatedTokenException.
-        CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = 
tablesByIdVv.get(causalityToken);
-
-        return assignmentsUpdatedVv.get(causalityToken).thenCompose(v -> 
tablesByIdFuture);
+        return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> 
unmodifiableMap(startedTables));
     }
 
     /**
-     * Returns the latest tables by ID map, for which all assignment updates 
have been completed.
+     * Returns an internal map, which contains all managed tables by their ID.
      */
-    private Map<Integer, TableImpl> latestTablesById() {
-        // TODO https://issues.apache.org/jira/browse/IGNITE-20915 fix this.
-        if (assignmentsUpdatedVv.latestCausalityToken() < 0L) {
-            // No tables at all in case of empty causality token.
-            return emptyMap();
-        } else {
-            CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = 
tablesByIdVv.get(tablesByIdVv.latestCausalityToken());
-
-            assert tablesByIdFuture.isDone() : "'tablesByIdVv' is always 
completed strictly before the 'assignmentsUpdatedVv'";
-
-            return tablesByIdFuture.join();
-        }
+    private Map<Integer, TableImpl> tablesById() {
+        return unmodifiableMap(tables);
     }
 
     /**
-     * Actual tables map.
-     *
-     * @return Actual tables map.
+     * Returns a map with started tables.
      */
     @TestOnly
-    public Map<Integer, TableImpl> latestTables() {
-        return unmodifiableMap(latestTablesById());
+    public Map<Integer, TableImpl> startedTables() {
+        return unmodifiableMap(startedTables);
     }
 
     @Override
@@ -1576,7 +1532,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         }
 
         try {
-            return 
localPartsByTableIdVv.get(causalityToken).thenApply(partitionSetById -> 
partitionSetById.get(tableId));
+            return localPartitionsVv.get(causalityToken).thenApply(unused -> 
localPartsByTableId.get(tableId));
         } finally {
             busyLock.leaveBusy();
         }
@@ -1617,7 +1573,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int 
tableId) {
-        TableImpl tableImpl = latestTablesById().get(tableId);
+        TableImpl tableImpl = startedTables.get(tableId);
 
         if (tableImpl != null) {
             return completedFuture(tableImpl);
@@ -1627,17 +1583,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         CompletionListener<Void> tablesListener = (token, v, th) -> {
             if (th == null) {
-                CompletableFuture<Map<Integer, TableImpl>> tablesFuture = 
tablesByIdVv.get(token);
+                CompletableFuture<?> tablesFuture = tablesVv.get(token);
 
                 tablesFuture.whenComplete((tables, e) -> {
                     if (e != null) {
                         getLatestTableFuture.completeExceptionally(e);
                     } else {
-                        TableImpl table = tables.get(tableId);
-
-                        if (table != null) {
-                            getLatestTableFuture.complete(table);
-                        }
+                        
getLatestTableFuture.complete(startedTables.get(tableId));
                     }
                 });
             } else {
@@ -1648,8 +1600,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         assignmentsUpdatedVv.whenComplete(tablesListener);
 
         // This check is needed for the case when we have registered 
tablesListener,
-        // but tablesByIdVv has already been completed, so listener would be 
triggered only for the next versioned value update.
-        tableImpl = latestTablesById().get(tableId);
+        // but tablesVv has already been completed, so listener would be 
triggered only for the next versioned value update.
+        tableImpl = startedTables.get(tableId);
 
         if (tableImpl != null) {
             assignmentsUpdatedVv.removeWhenComplete(tablesListener);
@@ -1737,8 +1689,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         Assignments pendingAssignments = 
Assignments.fromBytes(pendingAssignmentsEntry.value());
 
-        return tablesByIdVv.get(revision)
-                .thenApply(tables -> {
+        return tablesVv.get(revision)
+                .thenApply(ignore -> {
                     if (!busyLock.enterBusy()) {
                         return CompletableFuture.<Void>failedFuture(new 
NodeStoppingException());
                     }
@@ -1809,20 +1761,12 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         int zoneId = getTableDescriptor(tableId, 
catalogService.latestCatalogVersion()).zoneId();
 
         if (shouldStartLocalGroupNode) {
-            localServicesStartFuture = localPartsByTableIdVv.get(revision)
-                    .thenComposeAsync(oldMap -> {
-                        // TODO 
https://issues.apache.org/jira/browse/IGNITE-20957 This is incorrect usage of 
the value stored in
-                        // TODO versioned value. See ticket for the details.
-                        PartitionSet partitionSet = oldMap.get(tableId).copy();
-
-                        return getOrCreatePartitionStorages(tbl, 
partitionSet).thenApply(u -> {
-                            var newMap = new HashMap<>(oldMap);
-
-                            newMap.put(tableId, partitionSet);
-
-                            return newMap;
-                        });
-                    }, ioExecutor)
+            localServicesStartFuture = localPartitionsVv.get(revision)
+                    // TODO https://issues.apache.org/jira/browse/IGNITE-20957 
Revisit this code
+                    .thenApply(unused -> 
localPartsByTableId.get(tableId).copy())
+                    .thenComposeAsync(partitionSet -> inBusyLock(busyLock,
+                            () -> getOrCreatePartitionStorages(tbl, 
partitionSet)
+                    ), ioExecutor)
                     .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
startPartitionAndStartClient(
                             tbl,
                             replicaGrpId.partitionId(),
@@ -2247,8 +2191,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId 
tablePartitionId, long causalityToken) {
-        return tablesByIdVv.get(causalityToken)
-                .thenCompose(tables -> {
+        return tablesVv.get(causalityToken)
+                .thenCompose(ignore -> {
                     TableImpl table = tables.get(tablePartitionId.tableId());
 
                     return stopPartition(tablePartitionId, table)
@@ -2358,7 +2302,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      */
     @Override
     public @Nullable TableViewInternal cachedTable(int tableId) {
-        return startedTables.get(tableId);
+        return tables.get(tableId);
     }
 
     /**
@@ -2368,7 +2312,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      */
     @TestOnly
     public @Nullable TableViewInternal cachedTable(String name) {
-        return findTableImplByName(startedTables.values(), name);
+        return findTableImplByName(tables.values(), name);
     }
 
     private CatalogTableDescriptor getTableDescriptor(int tableId, int 
catalogVersion) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index f03fa9043c..7b041ea1a6 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -762,7 +762,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                 dsm = createDataStorageManager(configRegistry, workDir, 
storageEngineConfig),
                 workDir,
                 msm,
-                sm = new SchemaManager(revisionUpdater, catalogManager, msm),
+                sm = new SchemaManager(revisionUpdater, catalogManager),
                 budgetView -> new LocalLogStorageFactory(),
                 partitionOperationsExecutor,
                 partitionOperationsExecutor,


Reply via email to