This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 06a65d52bcf IGNITE-28327 Improve TableRegistry api (#7946)
06a65d52bcf is described below
commit 06a65d52bcf6efe90970a73f3afbb52cf35b9826
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Thu Apr 9 00:00:37 2026 +0300
IGNITE-28327 Improve TableRegistry api (#7946)
---
.../internal/table/distributed/TableManager.java | 39 ++--
.../internal/table/distributed/TableRegistry.java | 77 +++++--
.../table/distributed/TableZoneCoordinator.java | 25 +--
.../table/distributed/TableRegistryTest.java | 226 +++++++++++++++++++++
4 files changed, 316 insertions(+), 51 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 81bf849dab2..21002594b3d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.table.distributed;
-import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.anyOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -464,11 +463,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
schemaRegistry
);
- tableRegistry.tables().put(tableId, table);
+ tableRegistry.register(tableId, table);
zoneCoordinator.addTableToZone(zoneDescriptor.id(),
table);
- tableRegistry.startedTables().put(tableId, table);
+ tableRegistry.markStarted(tableId);
}));
});
}
@@ -499,7 +498,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return schemaManager.schemaRegistry(causalityToken,
tableId).thenAccept(schemaRegistry -> {
TableImpl table = createTableImpl(causalityToken,
tableDescriptor, zoneDescriptor, schemaDescriptor, schemaRegistry);
- tableRegistry.tables().put(tableId, table);
+ tableRegistry.register(tableId, table);
});
}));
@@ -524,7 +523,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private void onTableDrop(DropTableEventParameters parameters) {
inBusyLock(busyLock, () -> {
-
unregisterMetricsSource(tableRegistry.startedTables().get(parameters.tableId()));
+
unregisterMetricsSource(tableRegistry.startedTable(parameters.tableId()));
destructionEventsQueue.enqueue(new
DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
});
@@ -569,7 +568,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return failedFuture(e);
}
- TableViewInternal table =
tableRegistry.tables().get(parameters.tableId());
+ TableViewInternal table =
tableRegistry.table(parameters.tableId());
table.updateStalenessConfiguration(parameters.staleRowsFraction(),
parameters.minStaleRowsCount());
@@ -586,7 +585,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return failedFuture(e);
}
- TableViewInternal table =
tableRegistry.tables().get(parameters.tableId());
+ TableViewInternal table =
tableRegistry.table(parameters.tableId());
// TODO: revisit this approach, see
https://issues.apache.org/jira/browse/IGNITE-21235.
((TableImpl) table).name(parameters.newTableName());
@@ -633,7 +632,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
try {
closeAllManually(
zoneCoordinator::stop,
- () ->
closeAllManually(tableRegistry.tables().values().stream().map(table -> () ->
closeTable(table))),
+ () ->
closeAllManually(tableRegistry.allRegisteredTables().values().stream().map(table
-> () -> closeTable(table))),
() -> shutdownAndAwaitTermination(scanRequestExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() ->
streamerFlushExecutorFactory.stop(shutdownTimeoutSeconds)
);
@@ -688,9 +687,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
* @param tableId Table id to destroy.
*/
private CompletableFuture<Void> destroyTableLocally(int tableId) {
- TableViewInternal table =
tableRegistry.startedTables().remove(tableId);
-
- tableRegistry.localPartsByTableId().remove(tableId);
+ TableViewInternal table = tableRegistry.removeStarted(tableId);
assert table != null : tableId;
@@ -699,7 +696,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return zoneCoordinator.stopAndDestroyTableProcessors(table)
.thenComposeAsync(unused -> inBusyLockAsync(busyLock, () ->
internalTable.storage().destroy()), ioExecutor)
.thenAccept(unused -> inBusyLock(busyLock, () -> {
- tableRegistry.tables().remove(tableId);
+ tableRegistry.unregister(tableId);
schemaManager.dropRegistry(tableId);
}))
.whenComplete((v, e) -> {
@@ -751,14 +748,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @see #assignmentsUpdatedVv
*/
private CompletableFuture<Map<Integer, TableViewInternal>> tablesById(long
causalityToken) {
- return assignmentsUpdatedVv.get(causalityToken).thenApply(v ->
unmodifiableMap(tableRegistry.startedTables()));
+ return assignmentsUpdatedVv.get(causalityToken).thenApply(v ->
tableRegistry.allStartedTables());
}
/**
* Returns an internal map, which contains all managed tables by their ID.
*/
private Map<Integer, TableViewInternal> tablesById() {
- return unmodifiableMap(tableRegistry.tables());
+ return tableRegistry.allRegisteredTables();
}
/**
@@ -766,7 +763,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
*/
@TestOnly
public Map<Integer, TableViewInternal> startedTables() {
- return unmodifiableMap(tableRegistry.startedTables());
+ return tableRegistry.allStartedTables();
}
@Override
@@ -828,7 +825,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
try {
return localPartitionsVv.get(causalityToken)
- .thenApply(unused ->
tableRegistry.localPartsByTableId().getOrDefault(tableId,
PartitionSet.EMPTY_SET));
+ .thenApply(unused ->
tableRegistry.localPartitions(tableId));
} finally {
busyLock.leaveBusy();
}
@@ -870,7 +867,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int
tableId) {
- TableViewInternal tableImpl =
tableRegistry.startedTables().get(tableId);
+ TableViewInternal tableImpl = tableRegistry.startedTable(tableId);
if (tableImpl != null) {
return completedFuture(tableImpl);
@@ -886,7 +883,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
if (e != null) {
getLatestTableFuture.completeExceptionally(e);
} else {
-
getLatestTableFuture.complete(tableRegistry.startedTables().get(tableId));
+
getLatestTableFuture.complete(tableRegistry.startedTable(tableId));
}
});
} else {
@@ -900,7 +897,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
// This check is needed for the case when we have registered
tablesListener,
// but tablesVv has already been completed, so listener would be
triggered only for the next versioned value update.
- tableImpl = tableRegistry.startedTables().get(tableId);
+ tableImpl = tableRegistry.startedTable(tableId);
if (tableImpl != null) {
assignmentsUpdatedVv.removeWhenComplete(tablesListener);
@@ -923,7 +920,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
*/
@Override
public @Nullable TableViewInternal cachedTable(int tableId) {
- return tableRegistry.tables().get(tableId);
+ return tableRegistry.table(tableId);
}
/**
@@ -933,7 +930,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
*/
@TestOnly
public @Nullable TableViewInternal cachedTable(String name) {
- return findTableImplByName(tableRegistry.tables().values(), name);
+ return
findTableImplByName(tableRegistry.allRegisteredTables().values(), name);
}
private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor
tableDescriptor, int catalogVersion) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
index 5194a951678..f6a23aceeb3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java
@@ -17,18 +17,18 @@
package org.apache.ignite.internal.table.distributed;
+import static java.util.Collections.unmodifiableMap;
+
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.table.TableViewInternal;
+import org.jetbrains.annotations.Nullable;
/**
- * Holds shared mutable state for table tracking, shared between {@link
TableManager} and {@link TableZoneCoordinator}.
+ * Tracks table lifecycle state shared between {@link TableManager} and {@link
TableZoneCoordinator}.
*
- * <ul>
- * <li>{@link #tables} — all registered tables by ID.</li>
- * <li>{@link #startedTables} — tables that are fully started (partition
resources prepared).</li>
- * <li>{@link #localPartsByTableId} — local partition sets per table.</li>
- * </ul>
+ * <p>A table progresses through: registered → started → (removed from
started) → unregistered.
*/
class TableRegistry {
/** All registered tables by ID. */
@@ -40,15 +40,68 @@ class TableRegistry {
/** Local partitions by table ID. */
private final Map<Integer, PartitionSet> localPartsByTableId = new
ConcurrentHashMap<>();
- Map<Integer, TableViewInternal> tables() {
- return tables;
+ /** Registers a newly created table. Does not mark it as started. */
+ void register(int tableId, TableViewInternal table) {
+ tables.put(tableId, table);
+ }
+
+ /** Promotes an already-registered table to the started. */
+ void markStarted(int tableId) {
+ TableViewInternal table = tables.get(tableId);
+
+ assert table != null : "Table must be registered before marking as
started: tableId=" + tableId;
+
+ startedTables.put(tableId, table);
+ }
+
+ /** Returns a registered table by ID, or null if not found. */
+ @Nullable TableViewInternal table(int tableId) {
+ return tables.get(tableId);
+ }
+
+ /** Returns a started table by ID, or null if not started. */
+ @Nullable TableViewInternal startedTable(int tableId) {
+ return startedTables.get(tableId);
+ }
+
+ /** Returns an unmodifiable view of all registered tables. */
+ Map<Integer, TableViewInternal> allRegisteredTables() {
+ return unmodifiableMap(tables);
+ }
+
+ /** Returns an unmodifiable view of all started tables. */
+ Map<Integer, TableViewInternal> allStartedTables() {
+ return unmodifiableMap(startedTables);
+ }
+
+ /** Removes the table from started tables and clears its local partitions.
Returns the removed table, or null. */
+ @Nullable TableViewInternal removeStarted(int tableId) {
+ TableViewInternal removed = startedTables.remove(tableId);
+ localPartsByTableId.remove(tableId);
+ return removed;
+ }
+
+ /** Removes the table from the registry entirely. */
+ void unregister(int tableId) {
+ tables.remove(tableId);
+ }
+
+ /** Sets the local partition set for a table. */
+ void setLocalPartitions(int tableId, PartitionSet partitions) {
+ localPartsByTableId.put(tableId, partitions);
}
- Map<Integer, TableViewInternal> startedTables() {
- return startedTables;
+ /** Atomically extends local partitions by adding a partition index.
Creates a new set if absent. */
+ void extendLocalPartitions(int tableId, int partitionIndex) {
+ localPartsByTableId.compute(tableId, (id, old) -> {
+ PartitionSet set = Objects.requireNonNullElseGet(old,
BitSetPartitionSet::new);
+ set.set(partitionIndex);
+ return set;
+ });
}
- Map<Integer, PartitionSet> localPartsByTableId() {
- return localPartsByTableId;
+ /** Returns local partitions for a table, or {@link
PartitionSet#EMPTY_SET} if absent. */
+ PartitionSet localPartitions(int tableId) {
+ return localPartsByTableId.getOrDefault(tableId,
PartitionSet.EMPTY_SET);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
index 9a2822d9be6..44a4986c38e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java
@@ -316,10 +316,10 @@ class TableZoneCoordinator {
}
}
- var table = (TableImpl)
tableRegistry.tables().get(tableId);
+ var table = (TableImpl) tableRegistry.table(tableId);
return createPartitionStoragesIfAbsent(table, parts)
- .thenRun(() ->
tableRegistry.localPartsByTableId().put(tableId, parts));
+ .thenRun(() ->
tableRegistry.setLocalPartitions(tableId, parts));
}, ioExecutor))
// If the table is already closed, it's not a problem
(probably the node is stopping).
.exceptionally(ignoreTableClosedException())
@@ -331,7 +331,7 @@ class TableZoneCoordinator {
}
return localPartsUpdateFuture.thenRunAsync(() ->
inBusyLock(busyLock, () -> {
- var table = (TableImpl) tableRegistry.tables().get(tableId);
+ var table = (TableImpl) tableRegistry.table(tableId);
for (int i = 0; i < zoneDescriptor.partitions(); i++) {
var zonePartitionId = new
ZonePartitionId(zoneDescriptor.id(), i);
@@ -350,11 +350,9 @@ class TableZoneCoordinator {
// TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible
performance degradation.
return createPartsFut.thenAccept(ignore -> {
- var table = (TableImpl) tableRegistry.tables().get(tableId);
+ tableRegistry.markStarted(tableId);
- tableRegistry.startedTables().put(tableId, table);
-
- addTableToZone(zoneDescriptor.id(), table);
+ addTableToZone(zoneDescriptor.id(), (TableImpl)
tableRegistry.startedTable(tableId));
});
}
@@ -458,10 +456,7 @@ class TableZoneCoordinator {
CompletableFuture<?>[] futures = zoneTables.stream()
.map(tbl -> inBusyLockAsync(busyLock, () -> {
return runAsync(() -> inBusyLock(busyLock, ()
-> {
-
tableRegistry.localPartsByTableId().compute(
- tbl.tableId(),
- (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
- );
+
tableRegistry.extendLocalPartitions(tbl.tableId(), partitionIndex);
lowWatermark.getLowWatermarkSafe(lwm ->
registerIndexesToTable(
@@ -667,7 +662,7 @@ class TableZoneCoordinator {
return tokenFuture
.thenCompose(ignore -> {
- TableViewInternal table =
tableRegistry.tables().get(tablePartitionId.tableId());
+ TableViewInternal table =
tableRegistry.table(tablePartitionId.tableId());
assert table != null : tablePartitionId;
return stopAndDestroyTablePartition(tablePartitionId,
table);
@@ -793,12 +788,6 @@ class TableZoneCoordinator {
return tablesPerZone.getOrDefault(zoneId, Set.of());
}
- private static PartitionSet extendPartitionSet(@Nullable PartitionSet
oldPartitionSet, int partitionId) {
- PartitionSet newPartitionSet =
Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
- newPartitionSet.set(partitionId);
- return newPartitionSet;
- }
-
private static <T> Function<Throwable, T> ignoreTableClosedException() {
return ex -> {
if (hasCause(ex, TableClosedException.class)) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableRegistryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableRegistryTest.java
new file mode 100644
index 00000000000..fdf87f122d5
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableRegistryTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link TableRegistry}. */
+class TableRegistryTest extends BaseIgniteAbstractTest {
+
+ private final TableRegistry registry = new TableRegistry();
+
+ @Test
+ void registerAddsToTablesOnly() {
+ TableViewInternal table = mock(TableViewInternal.class);
+
+ registry.register(1, table);
+
+ assertSame(table, registry.table(1));
+ assertNull(registry.startedTable(1));
+ }
+
+ @Test
+ void markStartedPromotesRegisteredTable() {
+ TableViewInternal table = mock(TableViewInternal.class);
+
+ registry.register(1, table);
+ assertNull(registry.startedTable(1));
+
+ registry.markStarted(1);
+ assertSame(table, registry.startedTable(1));
+ }
+
+ @Test
+ void tableReturnsNullForUnknownId() {
+ assertNull(registry.table(42));
+ }
+
+ @Test
+ void startedTableReturnsNullForNonStarted() {
+ registry.register(1, mock(TableViewInternal.class));
+
+ assertNull(registry.startedTable(1));
+ }
+
+ @Test
+ void allTablesReturnsUnmodifiableMap() {
+ registry.register(1, mock(TableViewInternal.class));
+
+ assertThrows(UnsupportedOperationException.class, () ->
+ registry.allRegisteredTables().put(2,
mock(TableViewInternal.class)));
+ }
+
+ @Test
+ void allStartedTablesReturnsUnmodifiableMap() {
+ registry.register(1, mock(TableViewInternal.class));
+ registry.markStarted(1);
+
+ assertThrows(UnsupportedOperationException.class, () ->
+ registry.allStartedTables().put(2,
mock(TableViewInternal.class)));
+ }
+
+ @Test
+ void allTablesReflectsRegistrations() {
+ assertTrue(registry.allRegisteredTables().isEmpty());
+
+ registry.register(1, mock(TableViewInternal.class));
+ registry.register(2, mock(TableViewInternal.class));
+
+ assertEquals(2, registry.allRegisteredTables().size());
+ }
+
+ @Test
+ void allStartedTablesReflectsState() {
+ assertTrue(registry.allStartedTables().isEmpty());
+
+ registry.register(1, mock(TableViewInternal.class));
+ registry.markStarted(1);
+
+ assertEquals(1, registry.allStartedTables().size());
+ }
+
+ @Test
+ void removeStartedReturnsTableAndClearsLocalPartitions() {
+ TableViewInternal table = mock(TableViewInternal.class);
+
+ registry.register(1, table);
+ registry.markStarted(1);
+ registry.setLocalPartitions(1, new BitSetPartitionSet());
+
+ TableViewInternal removed = registry.removeStarted(1);
+
+ assertSame(table, removed);
+ assertNull(registry.startedTable(1));
+ assertSame(PartitionSet.EMPTY_SET, registry.localPartitions(1));
+ // Table should still be in the tables map.
+ assertSame(table, registry.table(1));
+ }
+
+ @Test
+ void removeStartedOnUnknownIdReturnsNull() {
+ assertNull(registry.removeStarted(42));
+ }
+
+ @Test
+ void unregisterRemovesFromTablesMap() {
+ registry.register(1, mock(TableViewInternal.class));
+
+ registry.unregister(1);
+
+ assertNull(registry.table(1));
+ }
+
+ @Test
+ void setAndGetLocalPartitions() {
+ PartitionSet parts = new BitSetPartitionSet();
+ parts.set(0);
+ parts.set(3);
+
+ registry.setLocalPartitions(1, parts);
+
+ assertSame(parts, registry.localPartitions(1));
+ }
+
+ @Test
+ void extendLocalPartitionsOnAbsentEntry() {
+ registry.extendLocalPartitions(1, 5);
+
+ PartitionSet parts = registry.localPartitions(1);
+ assertNotNull(parts);
+ assertTrue(parts.get(5));
+ assertEquals(1, parts.size());
+ }
+
+ @Test
+ void extendLocalPartitionsOnExistingEntry() {
+ registry.extendLocalPartitions(1, 2);
+ registry.extendLocalPartitions(1, 7);
+
+ PartitionSet parts = registry.localPartitions(1);
+ assertTrue(parts.get(2));
+ assertTrue(parts.get(7));
+ assertEquals(2, parts.size());
+ }
+
+ @Test
+ void localPartitionsReturnsEmptySetIfAbsent() {
+ assertSame(PartitionSet.EMPTY_SET, registry.localPartitions(99));
+ }
+
+ @Test
+ void markStartedOnUnregisteredTableFails() {
+ assertThrows(AssertionError.class, () -> registry.markStarted(42));
+ }
+
+ @Test
+ void fullLifecycle() {
+ TableViewInternal table = mock(TableViewInternal.class);
+
+ // Register.
+ registry.register(1, table);
+ assertSame(table, registry.table(1));
+ assertNull(registry.startedTable(1));
+
+ // Mark started.
+ registry.markStarted(1);
+ assertSame(table, registry.startedTable(1));
+
+ // Add partitions.
+ registry.extendLocalPartitions(1, 0);
+ assertTrue(registry.localPartitions(1).get(0));
+
+ // Remove started (deactivate).
+ TableViewInternal removed = registry.removeStarted(1);
+ assertSame(table, removed);
+ assertNull(registry.startedTable(1));
+ assertSame(PartitionSet.EMPTY_SET, registry.localPartitions(1));
+ assertSame(table, registry.table(1)); // still registered
+
+ // Unregister (final cleanup).
+ registry.unregister(1);
+ assertNull(registry.table(1));
+ }
+
+ @Test
+ void recoveryLifecycle() {
+ TableViewInternal table = mock(TableViewInternal.class);
+
+ // Recovery registers and starts.
+ registry.register(1, table);
+ registry.markStarted(1);
+ assertSame(table, registry.table(1));
+ assertSame(table, registry.startedTable(1));
+
+ // Destroy.
+ registry.removeStarted(1);
+ registry.unregister(1);
+
+ assertNull(registry.table(1));
+ assertNull(registry.startedTable(1));
+ }
+}