This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 90894e3651 IGNITE-21609 Avoid using versioned values for non-versioned
objects (#3310)
90894e3651 is described below
commit 90894e365195cbfe3029ee584ef41a5480e6e649
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Mar 4 16:55:38 2024 +0300
IGNITE-21609 Avoid using versioned values for non-versioned objects (#3310)
---
.../apache/ignite/internal/index/IndexManager.java | 66 ++-----
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../ignite/internal/schema/SchemaManager.java | 101 +++++------
.../ignite/internal/schema/SchemaManagerTest.java | 4 +-
.../rebalance/ItRebalanceDistributedTest.java | 8 +-
.../internal/table/distributed/TableManager.java | 192 ++++++++-------------
.../table/distributed/TableManagerTest.java | 2 +-
8 files changed, 136 insertions(+), 243 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index bd485017d0..069161857f 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -25,14 +25,13 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
@@ -106,8 +105,11 @@ public class IndexManager implements IgniteComponent {
/** Versioned value used only at the start of the manager. */
private final IncrementalVersionedValue<Void> startVv;
- /** Version value of multi-version table storages by ID for which indexes
were created. */
- private final IncrementalVersionedValue<Int2ObjectMap<MvTableStorage>>
mvTableStoragesByIdVv;
+ /** Versioned value for linearizing index partition changing events. */
+ private final IncrementalVersionedValue<Void> tableStoragesVv;
+
+ /** Table storages by ID for which indexes were created. */
+ private final Map<Integer, MvTableStorage> tableStoragesById = new
ConcurrentHashMap<>();
/**
* Constructor.
@@ -132,7 +134,7 @@ public class IndexManager implements IgniteComponent {
this.ioExecutor = ioExecutor;
startVv = new IncrementalVersionedValue<>(registry);
- mvTableStoragesByIdVv = new IncrementalVersionedValue<>(registry,
Int2ObjectMaps::emptyMap);
+ tableStoragesVv = new IncrementalVersionedValue<>(registry);
}
@Override
@@ -179,7 +181,7 @@ public class IndexManager implements IgniteComponent {
* parameters.
*/
CompletableFuture<MvTableStorage> getMvTableStorage(long causalityToken,
int tableId) {
- return
mvTableStoragesByIdVv.get(causalityToken).thenApply(mvTableStoragesById ->
mvTableStoragesById.get(tableId));
+ return tableStoragesVv.get(causalityToken).thenApply(ignore ->
tableStoragesById.get(tableId));
}
private CompletableFuture<Boolean>
onIndexDestroy(DestroyIndexEventParameters parameters) {
@@ -190,16 +192,14 @@ public class IndexManager implements IgniteComponent {
CompletableFuture<TableViewInternal> tableFuture =
tableManager.tableAsync(causalityToken, tableId);
- return inBusyLockAsync(busyLock, () -> mvTableStoragesByIdVv.update(
+ return inBusyLockAsync(busyLock, () -> tableStoragesVv.update(
causalityToken,
- updater(mvTableStorageById -> tableFuture.thenApply(table ->
inBusyLock(busyLock, () -> {
+ updater(ignore -> tableFuture.thenAccept(table ->
inBusyLock(busyLock, () -> {
if (table != null) {
// In case of DROP TABLE the table will be removed
first.
table.unregisterIndex(indexId);
-
- return mvTableStorageById;
} else {
- return
removeMvTableStorageIfPresent(mvTableStorageById, tableId);
+ tableStoragesById.remove(tableId);
}
})))
)).thenApply(unused -> false);
@@ -349,13 +349,17 @@ public class IndexManager implements IgniteComponent {
CompletableFuture<SchemaRegistry> schemaRegistryFuture =
schemaManager.schemaRegistry(causalityToken, tableId);
- return mvTableStoragesByIdVv.update(
+ return tableStoragesVv.update(
causalityToken,
- updater(mvTableStorageById ->
tablePartitionFuture.thenCombineAsync(schemaRegistryFuture,
+ updater(ignore ->
tablePartitionFuture.thenCombineAsync(schemaRegistryFuture,
(partitionSet, schemaRegistry) -> inBusyLock(busyLock,
() -> {
registerIndex(table, index, partitionSet,
schemaRegistry);
- return
addMvTableStorageIfAbsent(mvTableStorageById,
getTableViewStrict(tableId).internalTable().storage());
+ MvTableStorage storage =
getTableViewStrict(tableId).internalTable().storage();
+
+ tableStoragesById.putIfAbsent(tableId, storage);
+
+ return null;
}), ioExecutor))
);
}
@@ -393,38 +397,6 @@ public class IndexManager implements IgniteComponent {
}
}
- private static Int2ObjectMap<MvTableStorage> addMvTableStorageIfAbsent(
- Int2ObjectMap<MvTableStorage> mvTableStorageById,
- MvTableStorage mvTableStorage
- ) {
- int tableId = mvTableStorage.getTableDescriptor().getId();
-
- if (mvTableStorageById.containsKey(tableId)) {
- return mvTableStorageById;
- }
-
- Int2ObjectMap<MvTableStorage> newMap = new
Int2ObjectOpenHashMap<>(mvTableStorageById);
-
- newMap.put(tableId, mvTableStorage);
-
- return newMap;
- }
-
- private static Int2ObjectMap<MvTableStorage> removeMvTableStorageIfPresent(
- Int2ObjectMap<MvTableStorage> mvTableStorageById,
- int tableId
- ) {
- if (!mvTableStorageById.containsKey(tableId)) {
- return mvTableStorageById;
- }
-
- Int2ObjectMap<MvTableStorage> newMap = new
Int2ObjectOpenHashMap<>(mvTableStorageById);
-
- newMap.remove(tableId);
-
- return newMap;
- }
-
private TableViewInternal getTableViewStrict(int tableId) {
TableViewInternal table = tableManager.cachedTable(tableId);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 4dde7bce06..f083095297 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -496,7 +496,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
partitionIdleSafeTimePropagationPeriodMsSupplier
);
- SchemaManager schemaManager = new SchemaManager(registry,
catalogManager, metaStorageMgr);
+ SchemaManager schemaManager = new SchemaManager(registry,
catalogManager);
var dataNodesMock = dataNodesMockByNode.get(idx);
@@ -1069,7 +1069,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* @param tableName Table name.
*/
private void assertTablePresent(TableManager tableManager, String
tableName) {
- Collection<TableImpl> tables = tableManager.latestTables().values();
+ Collection<TableImpl> tables = tableManager.startedTables().values();
boolean isPresent = false;
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d1103775b4..5dc2b7ea5f 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -641,7 +641,7 @@ public class IgniteImpl implements Ignite {
SchemaSyncService schemaSyncService = new
SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
- schemaManager = new SchemaManager(registry, catalogManager,
metaStorageMgr);
+ schemaManager = new SchemaManager(registry, catalogManager);
ScheduledExecutorService rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
NamedThreadFactory.create(name, "rebalance-scheduler", LOG));
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 5764bcc189..3e9a6f7251 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.schema;
-import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -26,9 +25,9 @@ import static
org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -45,7 +44,6 @@ import
org.apache.ignite.internal.causality.IncrementalVersionedValue;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.schema.catalog.CatalogToSchemaDescriptorConverter;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -65,21 +63,16 @@ public class SchemaManager implements IgniteComponent {
private final CatalogService catalogService;
- /** Versioned store for tables by ID. */
- private final IncrementalVersionedValue<Map<Integer, SchemaRegistryImpl>>
registriesVv;
+ /** Versioned value for linearizing index partition changing events. */
+ private final IncrementalVersionedValue<Void> registriesVv;
- /** Meta storage manager. */
- private final MetaStorageManager metastorageMgr;
+ /** Schema registries by table ID. */
+ private final Map<Integer, SchemaRegistryImpl> registriesById = new
ConcurrentHashMap<>();
/** Constructor. */
- public SchemaManager(
- Consumer<LongFunction<CompletableFuture<?>>> registry,
- CatalogService catalogService,
- MetaStorageManager metastorageMgr
- ) {
- this.registriesVv = new IncrementalVersionedValue<>(registry,
HashMap::new);
+ public SchemaManager(Consumer<LongFunction<CompletableFuture<?>>>
registry, CatalogService catalogService) {
+ this.registriesVv = new IncrementalVersionedValue<>(registry);
this.catalogService = catalogService;
- this.metastorageMgr = metastorageMgr;
}
@Override
@@ -94,39 +87,30 @@ public class SchemaManager implements IgniteComponent {
}
private void registerExistingTables() {
- CompletableFuture<Long> recoveryFinishFuture =
metastorageMgr.recoveryFinishedFuture();
-
- assert recoveryFinishFuture.isDone();
-
- long causalityToken = recoveryFinishFuture.join();
-
for (int catalogVer = catalogService.latestCatalogVersion();
catalogVer >= catalogService.earliestCatalogVersion(); catalogVer--) {
Collection<CatalogTableDescriptor> tables =
catalogService.tables(catalogVer);
- registriesVv.update(causalityToken, (registries, throwable) -> {
- for (CatalogTableDescriptor tableDescriptor : tables) {
- int tableId = tableDescriptor.id();
+ for (CatalogTableDescriptor tableDescriptor : tables) {
+ int tableId = tableDescriptor.id();
- if (registries.containsKey(tableId)) {
- continue;
- }
-
- SchemaDescriptor prevSchema = null;
- CatalogTableSchemaVersions schemaVersions =
tableDescriptor.schemaVersions();
- for (int tableVer = schemaVersions.earliestVersion();
tableVer <= schemaVersions.latestVersion(); tableVer++) {
- SchemaDescriptor newSchema =
CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableVer);
+ if (registriesById.containsKey(tableId)) {
+ continue;
+ }
- if (prevSchema != null) {
-
newSchema.columnMapping(SchemaUtils.columnMapper(prevSchema, newSchema));
- }
+ SchemaDescriptor prevSchema = null;
+ CatalogTableSchemaVersions schemaVersions =
tableDescriptor.schemaVersions();
+ for (int tableVer = schemaVersions.earliestVersion(); tableVer
<= schemaVersions.latestVersion(); tableVer++) {
+ SchemaDescriptor newSchema =
CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableVer);
- prevSchema = newSchema;
- registries = registerSchema(registries, tableId,
newSchema);
+ if (prevSchema != null) {
+
newSchema.columnMapping(SchemaUtils.columnMapper(prevSchema, newSchema));
}
- }
- return completedFuture(registries);
- });
+ prevSchema = newSchema;
+
+ registerSchema(tableId, newSchema);
+ }
+ }
}
}
@@ -180,7 +164,9 @@ public class SchemaManager implements IgniteComponent {
);
}
- return completedFuture(registerSchema(registries, tableId,
newSchema));
+ registerSchema(tableId, newSchema);
+
+ return nullCompletedFuture();
})).thenApply(ignored -> false);
} finally {
busyLock.leaveBusy();
@@ -239,29 +225,22 @@ public class SchemaManager implements IgniteComponent {
/**
* Registers the new schema in the registries.
*
- * @param registries Registries before registering this schema.
* @param tableId ID of the table to which the schema belongs.
* @param schema The schema to register.
- * @return Registries after registering this schema.
*/
- private Map<Integer, SchemaRegistryImpl> registerSchema(
- Map<Integer, SchemaRegistryImpl> registries,
+ private void registerSchema(
int tableId,
SchemaDescriptor schema
) {
- SchemaRegistryImpl reg = registries.get(tableId);
-
- if (reg == null) {
- Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
-
- copy.put(tableId, createSchemaRegistry(tableId, schema));
+ registriesById.compute(tableId, (tableId0, reg) -> {
+ if (reg == null) {
+ return createSchemaRegistry(tableId0, schema);
+ }
- return copy;
- } else {
reg.onSchemaRegistered(schema);
- return registries;
- }
+ return reg;
+ });
}
/**
@@ -286,7 +265,7 @@ public class SchemaManager implements IgniteComponent {
* @return Descriptor if required schema found, or {@code null} otherwise.
*/
private @Nullable SchemaDescriptor searchSchemaByVersion(int tblId, int
schemaVer) {
- SchemaRegistry registry = registriesVv.latest().get(tblId);
+ SchemaRegistry registry = registriesById.get(tblId);
if (registry != null && schemaVer <=
registry.lastKnownSchemaVersion()) {
return registry.schema(schemaVer);
@@ -311,7 +290,7 @@ public class SchemaManager implements IgniteComponent {
try {
return registriesVv.get(causalityToken)
- .thenApply(regs -> inBusyLock(busyLock, () ->
regs.get(tableId)));
+ .thenApply(unused -> inBusyLock(busyLock, () ->
registriesById.get(tableId)));
} finally {
busyLock.leaveBusy();
}
@@ -324,7 +303,7 @@ public class SchemaManager implements IgniteComponent {
* @return Schema registry.
*/
public SchemaRegistry schemaRegistry(int tableId) {
- return registriesVv.latest().get(tableId);
+ return registriesById.get(tableId);
}
/**
@@ -345,12 +324,10 @@ public class SchemaManager implements IgniteComponent {
format("Cannot remove a schema registry for the
table [tblId={}]", tableId), e));
}
- Map<Integer, SchemaRegistryImpl> regs = new
HashMap<>(registries);
-
- SchemaRegistryImpl removedRegistry = regs.remove(tableId);
+ SchemaRegistryImpl removedRegistry =
registriesById.remove(tableId);
removedRegistry.close();
- return completedFuture(regs);
+ return nullCompletedFuture();
}));
} finally {
busyLock.leaveBusy();
@@ -366,6 +343,6 @@ public class SchemaManager implements IgniteComponent {
busyLock.block();
// noinspection ConstantConditions
- IgniteUtils.closeAllManually(registriesVv.latest().values());
+ IgniteUtils.closeAllManually(registriesById.values());
}
}
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
index 83013878a5..16a19ec9e9 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
@@ -105,7 +105,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_ALTER),
tableAlteredListener.capture());
doNothing().when(catalogService).listen(eq(CatalogEvent.TABLE_DESTROY),
tableDestroyedListener.capture());
- schemaManager = new SchemaManager(registry, catalogService,
metaStorageManager);
+ schemaManager = new SchemaManager(registry, catalogService);
schemaManager.start();
assertThat("Watches were not deployed",
metaStorageManager.deployWatches(), willCompleteSuccessfully());
@@ -281,7 +281,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
when(catalogService.tables(anyInt())).thenReturn(List.of(tableDescriptorAfterColumnAddition()));
doReturn(CompletableFuture.completedFuture(CAUSALITY_TOKEN_2)).when(metaStorageManager).recoveryFinishedFuture();
- schemaManager = new SchemaManager(registry, catalogService,
metaStorageManager);
+ schemaManager = new SchemaManager(registry, catalogService);
schemaManager.start();
completeCausalityToken(CAUSALITY_TOKEN_2);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 6be1f738c4..6c25e6a32f 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -683,7 +683,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertTrue(waitForCondition(
() -> nodes.stream().allMatch(n ->
n.tableManager
- .latestTables()
+ .startedTables()
.get(getTableId(node, TABLE_NAME))
.internalTable()
.tableRaftService()
@@ -739,7 +739,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertTrue(waitForCondition(
() -> nodes.stream().allMatch(n ->
n.tableManager
- .latestTables()
+ .startedTables()
.get(getTableId(node, TABLE_NAME))
.internalTable()
.tableRaftService()
@@ -824,7 +824,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
try {
return nodes.stream().allMatch(n ->
n.tableManager
- .latestTables()
+ .startedTables()
.get(getTableId(n, tableName))
.internalTable()
.tableRaftService()
@@ -1153,7 +1153,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
partitionIdleSafeTimePropagationPeriodMsSupplier
);
- schemaManager = new SchemaManager(registry, catalogManager,
metaStorageManager);
+ schemaManager = new SchemaManager(registry, catalogManager);
schemaSyncService = new
SchemaSyncServiceImpl(metaStorageManager.clusterTime(),
delayDurationMsSupplier);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3df5f576f1..5b1820fc91 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table.distributed;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
@@ -69,7 +68,6 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -261,38 +259,38 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private final TransactionStateResolver transactionStateResolver;
/**
- * Versioned store for tables by id. Only table instances are created
here, local storages and RAFT groups may not be initialized yet.
+ * Versioned value for linearizing table changing events.
*
- * @see #localPartsByTableIdVv
+ * @see #localPartitionsVv
* @see #assignmentsUpdatedVv
*/
- private final IncrementalVersionedValue<Map<Integer, TableImpl>>
tablesByIdVv;
+ private final IncrementalVersionedValue<Void> tablesVv;
/**
- * Versioned store for local partition set by table id.
+ * Versioned value for linearizing table partitions changing events.
*
- * <p>Completed strictly after {@link #tablesByIdVv} and strictly before
{@link #assignmentsUpdatedVv}.
+ * <p>Completed strictly after {@link #tablesVv} and strictly before
{@link #assignmentsUpdatedVv}.
*/
- private final IncrementalVersionedValue<Map<Integer, PartitionSet>>
localPartsByTableIdVv;
+ private final IncrementalVersionedValue<Void> localPartitionsVv;
/**
- * Versioned store for tracking RAFT groups initialization and starting
completion.
+ * Versioned value for tracking RAFT groups initialization and starting
completion.
*
* <p>Only explicitly updated in {@link
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int)}.
*
- * <p>Completed strictly after {@link #localPartsByTableIdVv}.
+ * <p>Completed strictly after {@link #localPartitionsVv}.
*/
private final IncrementalVersionedValue<Void> assignmentsUpdatedVv;
- /**
- * {@link TableImpl} is created during update of tablesByIdVv, we store
reference to it in case of updating of tablesByIdVv fails, so we
- * can stop resources associated with the table or to clean up table
resources on {@code TableManager#stop()}.
- */
- private final Map<Integer, TableImpl> pendingTables = new
ConcurrentHashMap<>();
+ /** Registered tables. */
+ private final Map<Integer, TableImpl> tables = new ConcurrentHashMap<>();
/** Started tables. */
private final Map<Integer, TableImpl> startedTables = new
ConcurrentHashMap<>();
+ /** Local partitions. */
+ private final Map<Integer, PartitionSet> localPartsByTableId = new
ConcurrentHashMap<>();
+
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -488,11 +486,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
schemaVersions = new
SchemaVersionsImpl(executorInclinedSchemaSyncService, catalogService, clock);
- tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new);
+ tablesVv = new IncrementalVersionedValue<>(registry);
- localPartsByTableIdVv = new
IncrementalVersionedValue<>(dependingOn(tablesByIdVv), HashMap::new);
+ localPartitionsVv = new
IncrementalVersionedValue<>(dependingOn(tablesVv));
- assignmentsUpdatedVv = new
IncrementalVersionedValue<>(dependingOn(localPartsByTableIdVv));
+ assignmentsUpdatedVv = new
IncrementalVersionedValue<>(dependingOn(localPartitionsVv));
txStateStorageScheduledPool =
Executors.newSingleThreadScheduledExecutor(
NamedThreadFactory.create(nodeName,
"tx-state-storage-scheduled-pool", LOG));
@@ -528,7 +526,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
metaStorageMgr,
messagingService,
topologyService,
- tableId -> latestTablesById().get(tableId)
+ tableId -> tablesById().get(tableId)
);
startVv = new IncrementalVersionedValue<>(registry);
@@ -726,19 +724,19 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
private CompletableFuture<?> onTableRename(RenameTableEventParameters
parameters) {
- return inBusyLockAsync(busyLock, () -> tablesByIdVv.update(
+ return inBusyLockAsync(busyLock, () -> tablesVv.update(
parameters.causalityToken(),
- (tablesById, e) -> {
+ (ignore, e) -> {
if (e != null) {
return failedFuture(e);
}
- TableImpl table = tablesById.get(parameters.tableId());
+ TableImpl table = tables.get(parameters.tableId());
// TODO: revisit this approach, see
https://issues.apache.org/jira/browse/IGNITE-21235.
table.name(parameters.newTableName());
- return completedFuture(tablesById);
+ return nullCompletedFuture();
})
);
}
@@ -1074,12 +1072,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
lowWatermark.removeUpdateListener(mvGc);
- var tablesToStop = new HashMap<Integer, TableImpl>();
-
- tablesToStop.putAll(latestTablesById());
- tablesToStop.putAll(pendingTables);
-
- cleanUpTablesResources(tablesToStop);
+ cleanUpTablesResources(tables);
}
@Override
@@ -1249,26 +1242,17 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
var table = new TableImpl(internalTable, lockMgr, schemaVersions,
marshallers, sql.get(), tableDescriptor.primaryKeyIndexId());
- tablesByIdVv.update(causalityToken, (previous, e) ->
inBusyLock(busyLock, () -> {
+ tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
if (e != null) {
return failedFuture(e);
}
- return schemaManager.schemaRegistry(causalityToken, tableId)
- .thenApply(schema -> {
- table.schemaView(schema);
-
- var val = new HashMap<>(previous);
-
- val.put(tableId, table);
-
- return val;
- });
+ return schemaManager.schemaRegistry(causalityToken,
tableId).thenAccept(table::schemaView);
}));
// NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
- CompletableFuture<?> localPartsUpdateFuture =
localPartsByTableIdVv.update(causalityToken,
- (previous, throwable) -> inBusyLock(busyLock, () ->
assignmentsFuture.thenComposeAsync(newAssignments -> {
+ CompletableFuture<?> localPartsUpdateFuture =
localPartitionsVv.update(causalityToken,
+ (ignore, throwable) -> inBusyLock(busyLock, () ->
assignmentsFuture.thenComposeAsync(newAssignments -> {
PartitionSet parts = new BitSetPartitionSet();
// TODO:
https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set
partitions only for
@@ -1277,39 +1261,28 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
parts.set(i);
}
- return getOrCreatePartitionStorages(table,
parts).thenApply(u -> {
- var newValue = new HashMap<>(previous);
-
- newValue.put(tableId, parts);
-
- return newValue;
- });
+ return getOrCreatePartitionStorages(table,
parts).thenAccept(u -> localPartsByTableId.put(tableId, parts));
}, ioExecutor)));
- // We bring the future outside to avoid OutdatedTokenException.
- CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture =
tablesByIdVv.get(causalityToken);
+ CompletableFuture<?> tablesByIdFuture = tablesVv.get(causalityToken);
// TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions
should be started only on the assignments change
- // TODO event triggered by zone create or alter.
+ // event triggered by zone create or alter.
CompletableFuture<?> createPartsFut =
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
if (e != null) {
return failedFuture(e);
}
- return localPartsUpdateFuture.thenCompose(unused ->
- tablesByIdFuture.thenComposeAsync(tablesById -> inBusyLock(
+ startedTables.put(tableId, table);
+
+ return allOf(localPartsUpdateFuture, tablesByIdFuture)
+ .thenComposeAsync(ignore -> inBusyLock(
busyLock,
() ->
startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id())
- ), ioExecutor)
- );
+ ), ioExecutor);
});
- pendingTables.put(tableId, table);
- startedTables.put(tableId, table);
-
- tablesById(causalityToken).thenAccept(ignored -> inBusyLock(busyLock,
() -> {
- pendingTables.remove(tableId);
- }));
+ tables.put(tableId, table);
// TODO should be reworked in IGNITE-16763
@@ -1381,27 +1354,25 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
*/
private void dropTableLocally(long causalityToken,
DestroyTableEventParameters parameters) {
int tableId = parameters.tableId();
+ // TODO Drop partitions from parameters and use from storage.
int partitions = parameters.partitions();
- localPartsByTableIdVv.update(causalityToken, (previousVal, e) ->
inBusyLock(busyLock, () -> {
+ localPartitionsVv.update(causalityToken, (previousVal, e) ->
inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(e);
}
- var newMap = new HashMap<>(previousVal);
- newMap.remove(tableId);
+ localPartsByTableId.remove(tableId);
- return completedFuture(newMap);
+ return nullCompletedFuture();
}));
- tablesByIdVv.update(causalityToken, (previousVal, e) ->
inBusyLock(busyLock, () -> {
+ tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
if (e != null) {
return failedFuture(e);
}
- var map = new HashMap<>(previousVal);
-
- TableImpl table = map.remove(tableId);
+ TableImpl table = tables.get(tableId);
assert table != null : tableId;
@@ -1410,7 +1381,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
CompletableFuture<?>[] stopReplicaFutures = new
CompletableFuture<?>[partitions];
// TODO https://issues.apache.org/jira/browse/IGNITE-19170
Partitions should be stopped on the assignments change
- // TODO event triggered by zone drop or alter.
+ // event triggered by zone drop or alter. Stop replica
asynchronously, out of metastorage event pipeline.
for (int partitionId = 0; partitionId < partitions; partitionId++)
{
var replicationGroupId = new TablePartitionId(tableId,
partitionId);
@@ -1425,7 +1396,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
runAsync(() ->
internalTable.txStateStorage().destroy(), ioExecutor)
),
ioExecutor
- ).thenApply(v -> map);
+ ).thenAccept(ignore0 -> tables.remove(tableId));
}));
startedTables.remove(tableId);
@@ -1478,37 +1449,22 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @see #assignmentsUpdatedVv
*/
private CompletableFuture<Map<Integer, TableImpl>> tablesById(long
causalityToken) {
- // We bring the future outside to avoid OutdatedTokenException.
- CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture =
tablesByIdVv.get(causalityToken);
-
- return assignmentsUpdatedVv.get(causalityToken).thenCompose(v ->
tablesByIdFuture);
+ return assignmentsUpdatedVv.get(causalityToken).thenApply(v ->
unmodifiableMap(startedTables));
}
/**
- * Returns the latest tables by ID map, for which all assignment updates
have been completed.
+ * Returns an internal map, which contains all managed tables by their ID.
*/
- private Map<Integer, TableImpl> latestTablesById() {
- // TODO https://issues.apache.org/jira/browse/IGNITE-20915 fix this.
- if (assignmentsUpdatedVv.latestCausalityToken() < 0L) {
- // No tables at all in case of empty causality token.
- return emptyMap();
- } else {
- CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture =
tablesByIdVv.get(tablesByIdVv.latestCausalityToken());
-
- assert tablesByIdFuture.isDone() : "'tablesByIdVv' is always
completed strictly before the 'assignmentsUpdatedVv'";
-
- return tablesByIdFuture.join();
- }
+ private Map<Integer, TableImpl> tablesById() {
+ return unmodifiableMap(tables);
}
/**
- * Actual tables map.
- *
- * @return Actual tables map.
+ * Returns a map with started tables.
*/
@TestOnly
- public Map<Integer, TableImpl> latestTables() {
- return unmodifiableMap(latestTablesById());
+ public Map<Integer, TableImpl> startedTables() {
+ return unmodifiableMap(startedTables);
}
@Override
@@ -1576,7 +1532,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
try {
- return
localPartsByTableIdVv.get(causalityToken).thenApply(partitionSetById ->
partitionSetById.get(tableId));
+ return localPartitionsVv.get(causalityToken).thenApply(unused ->
localPartsByTableId.get(tableId));
} finally {
busyLock.leaveBusy();
}
@@ -1617,7 +1573,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int
tableId) {
- TableImpl tableImpl = latestTablesById().get(tableId);
+ TableImpl tableImpl = startedTables.get(tableId);
if (tableImpl != null) {
return completedFuture(tableImpl);
@@ -1627,17 +1583,13 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
CompletionListener<Void> tablesListener = (token, v, th) -> {
if (th == null) {
- CompletableFuture<Map<Integer, TableImpl>> tablesFuture =
tablesByIdVv.get(token);
+ CompletableFuture<?> tablesFuture = tablesVv.get(token);
tablesFuture.whenComplete((tables, e) -> {
if (e != null) {
getLatestTableFuture.completeExceptionally(e);
} else {
- TableImpl table = tables.get(tableId);
-
- if (table != null) {
- getLatestTableFuture.complete(table);
- }
+
getLatestTableFuture.complete(startedTables.get(tableId));
}
});
} else {
@@ -1648,8 +1600,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
assignmentsUpdatedVv.whenComplete(tablesListener);
// This check is needed for the case when we have registered
tablesListener,
- // but tablesByIdVv has already been completed, so listener would be
triggered only for the next versioned value update.
- tableImpl = latestTablesById().get(tableId);
+ // but tablesVv has already been completed, so listener would be
triggered only for the next versioned value update.
+ tableImpl = startedTables.get(tableId);
if (tableImpl != null) {
assignmentsUpdatedVv.removeWhenComplete(tablesListener);
@@ -1737,8 +1689,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
Assignments pendingAssignments =
Assignments.fromBytes(pendingAssignmentsEntry.value());
- return tablesByIdVv.get(revision)
- .thenApply(tables -> {
+ return tablesVv.get(revision)
+ .thenApply(ignore -> {
if (!busyLock.enterBusy()) {
return CompletableFuture.<Void>failedFuture(new
NodeStoppingException());
}
@@ -1809,20 +1761,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
int zoneId = getTableDescriptor(tableId,
catalogService.latestCatalogVersion()).zoneId();
if (shouldStartLocalGroupNode) {
- localServicesStartFuture = localPartsByTableIdVv.get(revision)
- .thenComposeAsync(oldMap -> {
- // TODO
https://issues.apache.org/jira/browse/IGNITE-20957 This is incorrect usage of
the value stored in
- // TODO versioned value. See ticket for the details.
- PartitionSet partitionSet = oldMap.get(tableId).copy();
-
- return getOrCreatePartitionStorages(tbl,
partitionSet).thenApply(u -> {
- var newMap = new HashMap<>(oldMap);
-
- newMap.put(tableId, partitionSet);
-
- return newMap;
- });
- }, ioExecutor)
+ localServicesStartFuture = localPartitionsVv.get(revision)
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20957
Revisit this code
+ .thenApply(unused ->
localPartsByTableId.get(tableId).copy())
+ .thenComposeAsync(partitionSet -> inBusyLock(busyLock,
+ () -> getOrCreatePartitionStorages(tbl,
partitionSet)
+ ), ioExecutor)
.thenComposeAsync(unused -> inBusyLock(busyLock, () ->
startPartitionAndStartClient(
tbl,
replicaGrpId.partitionId(),
@@ -2247,8 +2191,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId
tablePartitionId, long causalityToken) {
- return tablesByIdVv.get(causalityToken)
- .thenCompose(tables -> {
+ return tablesVv.get(causalityToken)
+ .thenCompose(ignore -> {
TableImpl table = tables.get(tablePartitionId.tableId());
return stopPartition(tablePartitionId, table)
@@ -2358,7 +2302,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
*/
@Override
public @Nullable TableViewInternal cachedTable(int tableId) {
- return startedTables.get(tableId);
+ return tables.get(tableId);
}
/**
@@ -2368,7 +2312,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
*/
@TestOnly
public @Nullable TableViewInternal cachedTable(String name) {
- return findTableImplByName(startedTables.values(), name);
+ return findTableImplByName(tables.values(), name);
}
private CatalogTableDescriptor getTableDescriptor(int tableId, int
catalogVersion) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index f03fa9043c..7b041ea1a6 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -762,7 +762,7 @@ public class TableManagerTest extends IgniteAbstractTest {
dsm = createDataStorageManager(configRegistry, workDir,
storageEngineConfig),
workDir,
msm,
- sm = new SchemaManager(revisionUpdater, catalogManager, msm),
+ sm = new SchemaManager(revisionUpdater, catalogManager),
budgetView -> new LocalLogStorageFactory(),
partitionOperationsExecutor,
partitionOperationsExecutor,