This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 90894e3651 IGNITE-21609 Avoid using versioned values for non-versioned objects (#3310) 90894e3651 is described below commit 90894e365195cbfe3029ee584ef41a5480e6e649 Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com> AuthorDate: Mon Mar 4 16:55:38 2024 +0300 IGNITE-21609 Avoid using versioned values for non-versioned objects (#3310) --- .../apache/ignite/internal/index/IndexManager.java | 66 ++----- .../runner/app/ItIgniteNodeRestartTest.java | 4 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 2 +- .../ignite/internal/schema/SchemaManager.java | 101 +++++------ .../ignite/internal/schema/SchemaManagerTest.java | 4 +- .../rebalance/ItRebalanceDistributedTest.java | 8 +- .../internal/table/distributed/TableManager.java | 192 ++++++++------------- .../table/distributed/TableManagerTest.java | 2 +- 8 files changed, 136 insertions(+), 243 deletions(-) diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index bd485017d0..069161857f 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -25,14 +25,13 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectMaps; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -106,8 +105,11 @@ public class IndexManager implements IgniteComponent { /** Versioned value used only at the start of the manager. */ private final IncrementalVersionedValue<Void> startVv; - /** Version value of multi-version table storages by ID for which indexes were created. */ - private final IncrementalVersionedValue<Int2ObjectMap<MvTableStorage>> mvTableStoragesByIdVv; + /** Versioned value for linearizing index partition changing events. */ + private final IncrementalVersionedValue<Void> tableStoragesVv; + + /** Table storages by ID for which indexes were created. */ + private final Map<Integer, MvTableStorage> tableStoragesById = new ConcurrentHashMap<>(); /** * Constructor. @@ -132,7 +134,7 @@ public class IndexManager implements IgniteComponent { this.ioExecutor = ioExecutor; startVv = new IncrementalVersionedValue<>(registry); - mvTableStoragesByIdVv = new IncrementalVersionedValue<>(registry, Int2ObjectMaps::emptyMap); + tableStoragesVv = new IncrementalVersionedValue<>(registry); } @Override @@ -179,7 +181,7 @@ public class IndexManager implements IgniteComponent { * parameters. */ CompletableFuture<MvTableStorage> getMvTableStorage(long causalityToken, int tableId) { - return mvTableStoragesByIdVv.get(causalityToken).thenApply(mvTableStoragesById -> mvTableStoragesById.get(tableId)); + return tableStoragesVv.get(causalityToken).thenApply(ignore -> tableStoragesById.get(tableId)); } private CompletableFuture<Boolean> onIndexDestroy(DestroyIndexEventParameters parameters) { @@ -190,16 +192,14 @@ public class IndexManager implements IgniteComponent { CompletableFuture<TableViewInternal> tableFuture = tableManager.tableAsync(causalityToken, tableId); - return inBusyLockAsync(busyLock, () -> mvTableStoragesByIdVv.update( + return inBusyLockAsync(busyLock, () -> tableStoragesVv.update( causalityToken, - updater(mvTableStorageById -> tableFuture.thenApply(table -> inBusyLock(busyLock, () -> { + updater(ignore -> tableFuture.thenAccept(table -> inBusyLock(busyLock, () -> { if (table != null) { // In case of DROP TABLE the table will be removed first. table.unregisterIndex(indexId); - - return mvTableStorageById; } else { - return removeMvTableStorageIfPresent(mvTableStorageById, tableId); + tableStoragesById.remove(tableId); } }))) )).thenApply(unused -> false); @@ -349,13 +349,17 @@ public class IndexManager implements IgniteComponent { CompletableFuture<SchemaRegistry> schemaRegistryFuture = schemaManager.schemaRegistry(causalityToken, tableId); - return mvTableStoragesByIdVv.update( + return tableStoragesVv.update( causalityToken, - updater(mvTableStorageById -> tablePartitionFuture.thenCombineAsync(schemaRegistryFuture, + updater(ignore -> tablePartitionFuture.thenCombineAsync(schemaRegistryFuture, (partitionSet, schemaRegistry) -> inBusyLock(busyLock, () -> { registerIndex(table, index, partitionSet, schemaRegistry); - return addMvTableStorageIfAbsent(mvTableStorageById, getTableViewStrict(tableId).internalTable().storage()); + MvTableStorage storage = getTableViewStrict(tableId).internalTable().storage(); + + tableStoragesById.putIfAbsent(tableId, storage); + + return null; }), ioExecutor)) ); } @@ -393,38 +397,6 @@ public class IndexManager implements IgniteComponent { } } - private static Int2ObjectMap<MvTableStorage> addMvTableStorageIfAbsent( - Int2ObjectMap<MvTableStorage> mvTableStorageById, - MvTableStorage mvTableStorage - ) { - int tableId = mvTableStorage.getTableDescriptor().getId(); - - if (mvTableStorageById.containsKey(tableId)) { - return mvTableStorageById; - } - - Int2ObjectMap<MvTableStorage> newMap = new Int2ObjectOpenHashMap<>(mvTableStorageById); - - newMap.put(tableId, mvTableStorage); - - return newMap; - } - - private static Int2ObjectMap<MvTableStorage> removeMvTableStorageIfPresent( - Int2ObjectMap<MvTableStorage> mvTableStorageById, - int tableId - ) { - if (!mvTableStorageById.containsKey(tableId)) { - return mvTableStorageById; - } - - Int2ObjectMap<MvTableStorage> newMap = new Int2ObjectOpenHashMap<>(mvTableStorageById); - - newMap.remove(tableId); - - return newMap; - } - private TableViewInternal getTableViewStrict(int tableId) { TableViewInternal table = tableManager.cachedTable(tableId); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 4dde7bce06..f083095297 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -496,7 +496,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { partitionIdleSafeTimePropagationPeriodMsSupplier ); - SchemaManager schemaManager = new SchemaManager(registry, catalogManager, metaStorageMgr); + SchemaManager schemaManager = new SchemaManager(registry, catalogManager); var dataNodesMock = dataNodesMockByNode.get(idx); @@ -1069,7 +1069,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * @param tableName Table name. */ private void assertTablePresent(TableManager tableManager, String tableName) { - Collection<TableImpl> tables = tableManager.latestTables().values(); + Collection<TableImpl> tables = tableManager.startedTables().values(); boolean isPresent = false; diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index d1103775b4..5dc2b7ea5f 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -641,7 +641,7 @@ public class IgniteImpl implements Ignite { SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier); - schemaManager = new SchemaManager(registry, catalogManager, metaStorageMgr); + schemaManager = new SchemaManager(registry, catalogManager); ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, NamedThreadFactory.create(name, "rebalance-scheduler", LOG)); diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java index 5764bcc189..3e9a6f7251 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.schema; -import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; @@ -26,9 +25,9 @@ import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -45,7 +44,6 @@ import org.apache.ignite.internal.causality.IncrementalVersionedValue; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.manager.IgniteComponent; -import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.schema.catalog.CatalogToSchemaDescriptorConverter; import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -65,21 +63,16 @@ public class SchemaManager implements IgniteComponent { private final CatalogService catalogService; - /** Versioned store for tables by ID. */ - private final IncrementalVersionedValue<Map<Integer, SchemaRegistryImpl>> registriesVv; + /** Versioned value for linearizing index partition changing events. */ + private final IncrementalVersionedValue<Void> registriesVv; - /** Meta storage manager. */ - private final MetaStorageManager metastorageMgr; + /** Schema registries by table ID. */ + private final Map<Integer, SchemaRegistryImpl> registriesById = new ConcurrentHashMap<>(); /** Constructor. */ - public SchemaManager( - Consumer<LongFunction<CompletableFuture<?>>> registry, - CatalogService catalogService, - MetaStorageManager metastorageMgr - ) { - this.registriesVv = new IncrementalVersionedValue<>(registry, HashMap::new); + public SchemaManager(Consumer<LongFunction<CompletableFuture<?>>> registry, CatalogService catalogService) { + this.registriesVv = new IncrementalVersionedValue<>(registry); this.catalogService = catalogService; - this.metastorageMgr = metastorageMgr; } @Override @@ -94,39 +87,30 @@ public class SchemaManager implements IgniteComponent { } private void registerExistingTables() { - CompletableFuture<Long> recoveryFinishFuture = metastorageMgr.recoveryFinishedFuture(); - - assert recoveryFinishFuture.isDone(); - - long causalityToken = recoveryFinishFuture.join(); - for (int catalogVer = catalogService.latestCatalogVersion(); catalogVer >= catalogService.earliestCatalogVersion(); catalogVer--) { Collection<CatalogTableDescriptor> tables = catalogService.tables(catalogVer); - registriesVv.update(causalityToken, (registries, throwable) -> { - for (CatalogTableDescriptor tableDescriptor : tables) { - int tableId = tableDescriptor.id(); + for (CatalogTableDescriptor tableDescriptor : tables) { + int tableId = tableDescriptor.id(); - if (registries.containsKey(tableId)) { - continue; - } - - SchemaDescriptor prevSchema = null; - CatalogTableSchemaVersions schemaVersions = tableDescriptor.schemaVersions(); - for (int tableVer = schemaVersions.earliestVersion(); tableVer <= schemaVersions.latestVersion(); tableVer++) { - SchemaDescriptor newSchema = CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableVer); + if (registriesById.containsKey(tableId)) { + continue; + } - if (prevSchema != null) { - newSchema.columnMapping(SchemaUtils.columnMapper(prevSchema, newSchema)); - } + SchemaDescriptor prevSchema = null; + CatalogTableSchemaVersions schemaVersions = tableDescriptor.schemaVersions(); + for (int tableVer = schemaVersions.earliestVersion(); tableVer <= schemaVersions.latestVersion(); tableVer++) { + SchemaDescriptor newSchema = CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableVer); - prevSchema = newSchema; - registries = registerSchema(registries, tableId, newSchema); + if (prevSchema != null) { + newSchema.columnMapping(SchemaUtils.columnMapper(prevSchema, newSchema)); } - } - return completedFuture(registries); - }); + prevSchema = newSchema; + + registerSchema(tableId, newSchema); + } + } } } @@ -180,7 +164,9 @@ public class SchemaManager implements IgniteComponent { ); } - return completedFuture(registerSchema(registries, tableId, newSchema)); + registerSchema(tableId, newSchema); + + return nullCompletedFuture(); })).thenApply(ignored -> false); } finally { busyLock.leaveBusy(); @@ -239,29 +225,22 @@ public class SchemaManager implements IgniteComponent { /** * Registers the new schema in the registries. * - * @param registries Registries before registering this schema. * @param tableId ID of the table to which the schema belongs. * @param schema The schema to register. - * @return Registries after registering this schema. */ - private Map<Integer, SchemaRegistryImpl> registerSchema( - Map<Integer, SchemaRegistryImpl> registries, + private void registerSchema( int tableId, SchemaDescriptor schema ) { - SchemaRegistryImpl reg = registries.get(tableId); - - if (reg == null) { - Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries); - - copy.put(tableId, createSchemaRegistry(tableId, schema)); + registriesById.compute(tableId, (tableId0, reg) -> { + if (reg == null) { + return createSchemaRegistry(tableId0, schema); + } - return copy; - } else { reg.onSchemaRegistered(schema); - return registries; - } + return reg; + }); } /** @@ -286,7 +265,7 @@ public class SchemaManager implements IgniteComponent { * @return Descriptor if required schema found, or {@code null} otherwise. */ private @Nullable SchemaDescriptor searchSchemaByVersion(int tblId, int schemaVer) { - SchemaRegistry registry = registriesVv.latest().get(tblId); + SchemaRegistry registry = registriesById.get(tblId); if (registry != null && schemaVer <= registry.lastKnownSchemaVersion()) { return registry.schema(schemaVer); @@ -311,7 +290,7 @@ public class SchemaManager implements IgniteComponent { try { return registriesVv.get(causalityToken) - .thenApply(regs -> inBusyLock(busyLock, () -> regs.get(tableId))); + .thenApply(unused -> inBusyLock(busyLock, () -> registriesById.get(tableId))); } finally { busyLock.leaveBusy(); } @@ -324,7 +303,7 @@ public class SchemaManager implements IgniteComponent { * @return Schema registry. */ public SchemaRegistry schemaRegistry(int tableId) { - return registriesVv.latest().get(tableId); + return registriesById.get(tableId); } /** @@ -345,12 +324,10 @@ public class SchemaManager implements IgniteComponent { format("Cannot remove a schema registry for the table [tblId={}]", tableId), e)); } - Map<Integer, SchemaRegistryImpl> regs = new HashMap<>(registries); - - SchemaRegistryImpl removedRegistry = regs.remove(tableId); + SchemaRegistryImpl removedRegistry = registriesById.remove(tableId); removedRegistry.close(); - return completedFuture(regs); + return nullCompletedFuture(); })); } finally { busyLock.leaveBusy(); @@ -366,6 +343,6 @@ public class SchemaManager implements IgniteComponent { busyLock.block(); // noinspection ConstantConditions - IgniteUtils.closeAllManually(registriesVv.latest().values()); + IgniteUtils.closeAllManually(registriesById.values()); } } diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java index 83013878a5..16a19ec9e9 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java @@ -105,7 +105,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_ALTER), tableAlteredListener.capture()); doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_DESTROY), tableDestroyedListener.capture()); - schemaManager = new SchemaManager(registry, catalogService, metaStorageManager); + schemaManager = new SchemaManager(registry, catalogService); schemaManager.start(); assertThat("Watches were not deployed", metaStorageManager.deployWatches(), willCompleteSuccessfully()); @@ -281,7 +281,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { when(catalogService.tables(anyInt())).thenReturn(List.of(tableDescriptorAfterColumnAddition())); doReturn(CompletableFuture.completedFuture(CAUSALITY_TOKEN_2)).when(metaStorageManager).recoveryFinishedFuture(); - schemaManager = new SchemaManager(registry, catalogService, metaStorageManager); + schemaManager = new SchemaManager(registry, catalogService); schemaManager.start(); completeCausalityToken(CAUSALITY_TOKEN_2); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 6be1f738c4..6c25e6a32f 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -683,7 +683,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { assertTrue(waitForCondition( () -> nodes.stream().allMatch(n -> n.tableManager - .latestTables() + .startedTables() .get(getTableId(node, TABLE_NAME)) .internalTable() .tableRaftService() @@ -739,7 +739,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { assertTrue(waitForCondition( () -> nodes.stream().allMatch(n -> n.tableManager - .latestTables() + .startedTables() .get(getTableId(node, TABLE_NAME)) .internalTable() .tableRaftService() @@ -824,7 +824,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { try { return nodes.stream().allMatch(n -> n.tableManager - .latestTables() + .startedTables() .get(getTableId(n, tableName)) .internalTable() .tableRaftService() @@ -1153,7 +1153,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { partitionIdleSafeTimePropagationPeriodMsSupplier ); - schemaManager = new SchemaManager(registry, catalogManager, metaStorageManager); + schemaManager = new SchemaManager(registry, catalogManager); schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), delayDurationMsSupplier); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 3df5f576f1..5b1820fc91 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.table.distributed; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableMap; import static java.util.concurrent.CompletableFuture.allOf; @@ -69,7 +68,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -261,38 +259,38 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final TransactionStateResolver transactionStateResolver; /** - * Versioned store for tables by id. Only table instances are created here, local storages and RAFT groups may not be initialized yet. + * Versioned value for linearizing table changing events. * - * @see #localPartsByTableIdVv + * @see #localPartitionsVv * @see #assignmentsUpdatedVv */ - private final IncrementalVersionedValue<Map<Integer, TableImpl>> tablesByIdVv; + private final IncrementalVersionedValue<Void> tablesVv; /** - * Versioned store for local partition set by table id. + * Versioned value for linearizing table partitions changing events. * - * <p>Completed strictly after {@link #tablesByIdVv} and strictly before {@link #assignmentsUpdatedVv}. + * <p>Completed strictly after {@link #tablesVv} and strictly before {@link #assignmentsUpdatedVv}. */ - private final IncrementalVersionedValue<Map<Integer, PartitionSet>> localPartsByTableIdVv; + private final IncrementalVersionedValue<Void> localPartitionsVv; /** - * Versioned store for tracking RAFT groups initialization and starting completion. + * Versioned value for tracking RAFT groups initialization and starting completion. * * <p>Only explicitly updated in {@link #startLocalPartitionsAndClients(CompletableFuture, TableImpl, int)}. * - * <p>Completed strictly after {@link #localPartsByTableIdVv}. + * <p>Completed strictly after {@link #localPartitionsVv}. */ private final IncrementalVersionedValue<Void> assignmentsUpdatedVv; - /** - * {@link TableImpl} is created during update of tablesByIdVv, we store reference to it in case of updating of tablesByIdVv fails, so we - * can stop resources associated with the table or to clean up table resources on {@code TableManager#stop()}. - */ - private final Map<Integer, TableImpl> pendingTables = new ConcurrentHashMap<>(); + /** Registered tables. */ + private final Map<Integer, TableImpl> tables = new ConcurrentHashMap<>(); /** Started tables. */ private final Map<Integer, TableImpl> startedTables = new ConcurrentHashMap<>(); + /** Local partitions. */ + private final Map<Integer, PartitionSet> localPartsByTableId = new ConcurrentHashMap<>(); + /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); @@ -488,11 +486,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { schemaVersions = new SchemaVersionsImpl(executorInclinedSchemaSyncService, catalogService, clock); - tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new); + tablesVv = new IncrementalVersionedValue<>(registry); - localPartsByTableIdVv = new IncrementalVersionedValue<>(dependingOn(tablesByIdVv), HashMap::new); + localPartitionsVv = new IncrementalVersionedValue<>(dependingOn(tablesVv)); - assignmentsUpdatedVv = new IncrementalVersionedValue<>(dependingOn(localPartsByTableIdVv)); + assignmentsUpdatedVv = new IncrementalVersionedValue<>(dependingOn(localPartitionsVv)); txStateStorageScheduledPool = Executors.newSingleThreadScheduledExecutor( NamedThreadFactory.create(nodeName, "tx-state-storage-scheduled-pool", LOG)); @@ -528,7 +526,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { metaStorageMgr, messagingService, topologyService, - tableId -> latestTablesById().get(tableId) + tableId -> tablesById().get(tableId) ); startVv = new IncrementalVersionedValue<>(registry); @@ -726,19 +724,19 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters) { - return inBusyLockAsync(busyLock, () -> tablesByIdVv.update( + return inBusyLockAsync(busyLock, () -> tablesVv.update( parameters.causalityToken(), - (tablesById, e) -> { + (ignore, e) -> { if (e != null) { return failedFuture(e); } - TableImpl table = tablesById.get(parameters.tableId()); + TableImpl table = tables.get(parameters.tableId()); // TODO: revisit this approach, see https://issues.apache.org/jira/browse/IGNITE-21235. table.name(parameters.newTableName()); - return completedFuture(tablesById); + return nullCompletedFuture(); }) ); } @@ -1074,12 +1072,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { lowWatermark.removeUpdateListener(mvGc); - var tablesToStop = new HashMap<Integer, TableImpl>(); - - tablesToStop.putAll(latestTablesById()); - tablesToStop.putAll(pendingTables); - - cleanUpTablesResources(tablesToStop); + cleanUpTablesResources(tables); } @Override @@ -1249,26 +1242,17 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { var table = new TableImpl(internalTable, lockMgr, schemaVersions, marshallers, sql.get(), tableDescriptor.primaryKeyIndexId()); - tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> { + tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () -> { if (e != null) { return failedFuture(e); } - return schemaManager.schemaRegistry(causalityToken, tableId) - .thenApply(schema -> { - table.schemaView(schema); - - var val = new HashMap<>(previous); - - val.put(tableId, table); - - return val; - }); + return schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(table::schemaView); })); // NB: all vv.update() calls must be made from the synchronous part of the method (not in thenCompose()/etc!). - CompletableFuture<?> localPartsUpdateFuture = localPartsByTableIdVv.update(causalityToken, - (previous, throwable) -> inBusyLock(busyLock, () -> assignmentsFuture.thenComposeAsync(newAssignments -> { + CompletableFuture<?> localPartsUpdateFuture = localPartitionsVv.update(causalityToken, + (ignore, throwable) -> inBusyLock(busyLock, () -> assignmentsFuture.thenComposeAsync(newAssignments -> { PartitionSet parts = new BitSetPartitionSet(); // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set partitions only for @@ -1277,39 +1261,28 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { parts.set(i); } - return getOrCreatePartitionStorages(table, parts).thenApply(u -> { - var newValue = new HashMap<>(previous); - - newValue.put(tableId, parts); - - return newValue; - }); + return getOrCreatePartitionStorages(table, parts).thenAccept(u -> localPartsByTableId.put(tableId, parts)); }, ioExecutor))); - // We bring the future outside to avoid OutdatedTokenException. - CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = tablesByIdVv.get(causalityToken); + CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken); // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions should be started only on the assignments change - // TODO event triggered by zone create or alter. + // event triggered by zone create or alter. CompletableFuture<?> createPartsFut = assignmentsUpdatedVv.update(causalityToken, (token, e) -> { if (e != null) { return failedFuture(e); } - return localPartsUpdateFuture.thenCompose(unused -> - tablesByIdFuture.thenComposeAsync(tablesById -> inBusyLock( + startedTables.put(tableId, table); + + return allOf(localPartsUpdateFuture, tablesByIdFuture) + .thenComposeAsync(ignore -> inBusyLock( busyLock, () -> startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id()) - ), ioExecutor) - ); + ), ioExecutor); }); - pendingTables.put(tableId, table); - startedTables.put(tableId, table); - - tablesById(causalityToken).thenAccept(ignored -> inBusyLock(busyLock, () -> { - pendingTables.remove(tableId); - })); + tables.put(tableId, table); // TODO should be reworked in IGNITE-16763 @@ -1381,27 +1354,25 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { */ private void dropTableLocally(long causalityToken, DestroyTableEventParameters parameters) { int tableId = parameters.tableId(); + // TODO Drop partitions from parameters and use from storage. int partitions = parameters.partitions(); - localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { + localPartitionsVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { if (e != null) { return failedFuture(e); } - var newMap = new HashMap<>(previousVal); - newMap.remove(tableId); + localPartsByTableId.remove(tableId); - return completedFuture(newMap); + return nullCompletedFuture(); })); - tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { + tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () -> { if (e != null) { return failedFuture(e); } - var map = new HashMap<>(previousVal); - - TableImpl table = map.remove(tableId); + TableImpl table = tables.get(tableId); assert table != null : tableId; @@ -1410,7 +1381,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { CompletableFuture<?>[] stopReplicaFutures = new CompletableFuture<?>[partitions]; // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions should be stopped on the assignments change - // TODO event triggered by zone drop or alter. + // event triggered by zone drop or alter. Stop replica asynchronously, out of metastorage event pipeline. for (int partitionId = 0; partitionId < partitions; partitionId++) { var replicationGroupId = new TablePartitionId(tableId, partitionId); @@ -1425,7 +1396,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { runAsync(() -> internalTable.txStateStorage().destroy(), ioExecutor) ), ioExecutor - ).thenApply(v -> map); + ).thenAccept(ignore0 -> tables.remove(tableId)); })); startedTables.remove(tableId); @@ -1478,37 +1449,22 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * @see #assignmentsUpdatedVv */ private CompletableFuture<Map<Integer, TableImpl>> tablesById(long causalityToken) { - // We bring the future outside to avoid OutdatedTokenException. - CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = tablesByIdVv.get(causalityToken); - - return assignmentsUpdatedVv.get(causalityToken).thenCompose(v -> tablesByIdFuture); + return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> unmodifiableMap(startedTables)); } /** - * Returns the latest tables by ID map, for which all assignment updates have been completed. + * Returns an internal map, which contains all managed tables by their ID. */ - private Map<Integer, TableImpl> latestTablesById() { - // TODO https://issues.apache.org/jira/browse/IGNITE-20915 fix this. - if (assignmentsUpdatedVv.latestCausalityToken() < 0L) { - // No tables at all in case of empty causality token. - return emptyMap(); - } else { - CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = tablesByIdVv.get(tablesByIdVv.latestCausalityToken()); - - assert tablesByIdFuture.isDone() : "'tablesByIdVv' is always completed strictly before the 'assignmentsUpdatedVv'"; - - return tablesByIdFuture.join(); - } + private Map<Integer, TableImpl> tablesById() { + return unmodifiableMap(tables); } /** - * Actual tables map. - * - * @return Actual tables map. + * Returns a map with started tables. */ @TestOnly - public Map<Integer, TableImpl> latestTables() { - return unmodifiableMap(latestTablesById()); + public Map<Integer, TableImpl> startedTables() { + return unmodifiableMap(startedTables); } @Override @@ -1576,7 +1532,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } try { - return localPartsByTableIdVv.get(causalityToken).thenApply(partitionSetById -> partitionSetById.get(tableId)); + return localPartitionsVv.get(causalityToken).thenApply(unused -> localPartsByTableId.get(tableId)); } finally { busyLock.leaveBusy(); } @@ -1617,7 +1573,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId) { - TableImpl tableImpl = latestTablesById().get(tableId); + TableImpl tableImpl = startedTables.get(tableId); if (tableImpl != null) { return completedFuture(tableImpl); @@ -1627,17 +1583,13 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { CompletionListener<Void> tablesListener = (token, v, th) -> { if (th == null) { - CompletableFuture<Map<Integer, TableImpl>> tablesFuture = tablesByIdVv.get(token); + CompletableFuture<?> tablesFuture = tablesVv.get(token); tablesFuture.whenComplete((tables, e) -> { if (e != null) { getLatestTableFuture.completeExceptionally(e); } else { - TableImpl table = tables.get(tableId); - - if (table != null) { - getLatestTableFuture.complete(table); - } + getLatestTableFuture.complete(startedTables.get(tableId)); } }); } else { @@ -1648,8 +1600,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { assignmentsUpdatedVv.whenComplete(tablesListener); // This check is needed for the case when we have registered tablesListener, - // but tablesByIdVv has already been completed, so listener would be triggered only for the next versioned value update. - tableImpl = latestTablesById().get(tableId); + // but tablesVv has already been completed, so listener would be triggered only for the next versioned value update. + tableImpl = startedTables.get(tableId); if (tableImpl != null) { assignmentsUpdatedVv.removeWhenComplete(tablesListener); @@ -1737,8 +1689,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { Assignments pendingAssignments = Assignments.fromBytes(pendingAssignmentsEntry.value()); - return tablesByIdVv.get(revision) - .thenApply(tables -> { + return tablesVv.get(revision) + .thenApply(ignore -> { if (!busyLock.enterBusy()) { return CompletableFuture.<Void>failedFuture(new NodeStoppingException()); } @@ -1809,20 +1761,12 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { int zoneId = getTableDescriptor(tableId, catalogService.latestCatalogVersion()).zoneId(); if (shouldStartLocalGroupNode) { - localServicesStartFuture = localPartsByTableIdVv.get(revision) - .thenComposeAsync(oldMap -> { - // TODO https://issues.apache.org/jira/browse/IGNITE-20957 This is incorrect usage of the value stored in - // TODO versioned value. See ticket for the details. - PartitionSet partitionSet = oldMap.get(tableId).copy(); - - return getOrCreatePartitionStorages(tbl, partitionSet).thenApply(u -> { - var newMap = new HashMap<>(oldMap); - - newMap.put(tableId, partitionSet); - - return newMap; - }); - }, ioExecutor) + localServicesStartFuture = localPartitionsVv.get(revision) + // TODO https://issues.apache.org/jira/browse/IGNITE-20957 Revisit this code + .thenApply(unused -> localPartsByTableId.get(tableId).copy()) + .thenComposeAsync(partitionSet -> inBusyLock(busyLock, + () -> getOrCreatePartitionStorages(tbl, partitionSet) + ), ioExecutor) .thenComposeAsync(unused -> inBusyLock(busyLock, () -> startPartitionAndStartClient( tbl, replicaGrpId.partitionId(), @@ -2247,8 +2191,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId tablePartitionId, long causalityToken) { - return tablesByIdVv.get(causalityToken) - .thenCompose(tables -> { + return tablesVv.get(causalityToken) + .thenCompose(ignore -> { TableImpl table = tables.get(tablePartitionId.tableId()); return stopPartition(tablePartitionId, table) @@ -2358,7 +2302,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { */ @Override public @Nullable TableViewInternal cachedTable(int tableId) { - return startedTables.get(tableId); + return tables.get(tableId); } /** @@ -2368,7 +2312,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { */ @TestOnly public @Nullable TableViewInternal cachedTable(String name) { - return findTableImplByName(startedTables.values(), name); + return findTableImplByName(tables.values(), name); } private CatalogTableDescriptor getTableDescriptor(int tableId, int catalogVersion) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index f03fa9043c..7b041ea1a6 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -762,7 +762,7 @@ public class TableManagerTest extends IgniteAbstractTest { dsm = createDataStorageManager(configRegistry, workDir, storageEngineConfig), workDir, msm, - sm = new SchemaManager(revisionUpdater, catalogManager, msm), + sm = new SchemaManager(revisionUpdater, catalogManager), budgetView -> new LocalLogStorageFactory(), partitionOperationsExecutor, partitionOperationsExecutor,