sashapolo commented on code in PR #3446:
URL: https://github.com/apache/ignite-3/pull/3446#discussion_r1532261469
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -482,4 +484,25 @@ public void addStore(GroupPartitionId groupPartitionId,
FilePageStore filePageSt
return filePageStore;
});
}
+
+ /**
+ * Destroys the group directory with all sub-directories and files if it
exists.
+ *
+ * @throws IOException If there was an I/O error when deleting a directory.
+ */
+ public void destroyGroupIfExists(int groupId) throws IOException {
+ Path groupDir = groupDir(groupId);
+
+ try {
+ if (exists(groupDir)) {
+ deleteIfExistsThrowable(groupDir);
+ }
+ } catch (IOException e) {
+ throw new IOException("Failed to delete group directory: " +
groupDir, e);
Review Comment:
Do we really need to catch this exception? Usually an IO Exception contains
the path you gave it
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2495,4 +2497,31 @@ public int tableId() {
return tableId;
}
}
+
+ private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
+ // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from volt and metastore
Review Comment:
Do we need to change the link to IGNITE-20287 and close IGNITE-20384?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +87,44 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ * @return Result is sorted by the catalog version in which the table was
removed from the catalog and by the table ID.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
+ if (lowWatermark == null) {
+ return List.of();
+ }
+
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+ Set<Integer> previousCatalogVersionTableIds = Set.of();
+
+ var res = new ArrayList<DroppedTableInfo>();
+
+ for (int catalogVersion = earliestCatalogVersion; catalogVersion <=
lwmCatalogVersion; catalogVersion++) {
Review Comment:
Why do we start from the earliestCatalogVersoin and not from
lwmCatalogVersion? Aren't we interested in the most recent table version?
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java:
##########
@@ -35,95 +37,163 @@
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.util.List;
-import java.util.function.Consumer;
import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/** For {@link TableUtils} testing. */
public class TableUtilsTest extends IgniteAbstractTest {
private final HybridClock clock = new HybridClockImpl();
- @Test
- void testIndexIdsAtRwTxBeginTs() throws Exception {
- withCatalogManager(catalogManager -> {
- createSimpleTable(catalogManager, TABLE_NAME);
-
- String indexName0 = INDEX_NAME + 0;
- String indexName1 = INDEX_NAME + 1;
- String indexName2 = INDEX_NAME + 2;
- String indexName3 = INDEX_NAME + 3;
- String indexName4 = INDEX_NAME + 4;
-
- for (String indexName : List.of(indexName0, indexName1,
indexName2, indexName3, indexName4)) {
- createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
- }
-
- int indexId0 = indexId(catalogManager, indexName0);
- int indexId1 = indexId(catalogManager, indexName1);
- int indexId2 = indexId(catalogManager, indexName2);
- int indexId3 = indexId(catalogManager, indexName3);
- int indexId4 = indexId(catalogManager, indexName4);
-
- for (String indexName : List.of(indexName1, indexName2,
indexName3, indexName4)) {
- startBuildingIndex(catalogManager, indexId(catalogManager,
indexName));
- }
-
- for (String indexName : List.of(indexName2, indexName3,
indexName4)) {
- makeIndexAvailable(catalogManager, indexId(catalogManager,
indexName));
- }
-
- for (String indexName : List.of(indexName3, indexName4)) {
- dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
- }
-
- removeIndex(catalogManager, indexId4);
-
- CatalogManager spy = spy(catalogManager);
-
- HybridTimestamp beginTs = clock.now();
-
- int tableId = getTableIdStrict(catalogManager, TABLE_NAME,
clock.nowLong());
-
- assertThat(
- indexIdsAtRwTxBeginTs(spy, transactionId(beginTs, 1),
tableId),
- contains(
- indexId(catalogManager, pkIndexName(TABLE_NAME)),
- indexId0,
- indexId1,
- indexId2,
- indexId3
- )
- );
-
- verify(spy).activeCatalogVersion(eq(beginTs.longValue()));
-
verify(spy).indexes(eq(catalogManager.activeCatalogVersion(beginTs.longValue())),
eq(tableId));
- });
+ private final CatalogManager catalogManager =
CatalogTestUtils.createTestCatalogManager("test-node", clock);
+
+ @BeforeEach
+ void setUp() {
+ assertThat(catalogManager.start(), willCompleteSuccessfully());
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ closeAll(
+ catalogManager::beforeNodeStop,
+ catalogManager::stop
+ );
}
- private void withCatalogManager(Consumer<CatalogManager> consumer) throws
Exception {
- CatalogManager catalogManager =
CatalogTestUtils.createTestCatalogManager("test-node", clock);
+ @Test
+ void testIndexIdsAtRwTxBeginTs() {
+ createSimpleTable(catalogManager, TABLE_NAME);
- assertThat(catalogManager.start(), willCompleteSuccessfully());
+ String indexName0 = INDEX_NAME + 0;
+ String indexName1 = INDEX_NAME + 1;
+ String indexName2 = INDEX_NAME + 2;
+ String indexName3 = INDEX_NAME + 3;
+ String indexName4 = INDEX_NAME + 4;
- try {
- consumer.accept(catalogManager);
- } finally {
- closeAll(catalogManager::beforeNodeStop, catalogManager::stop);
+ for (String indexName : List.of(indexName0, indexName1, indexName2,
indexName3, indexName4)) {
+ createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
}
+
+ int indexId0 = indexId(indexName0);
+ int indexId1 = indexId(indexName1);
+ int indexId2 = indexId(indexName2);
+ int indexId3 = indexId(indexName3);
+ int indexId4 = indexId(indexName4);
+
+ for (String indexName : List.of(indexName1, indexName2, indexName3,
indexName4)) {
+ startBuildingIndex(catalogManager, indexId(indexName));
+ }
+
+ for (String indexName : List.of(indexName2, indexName3, indexName4)) {
+ makeIndexAvailable(catalogManager, indexId(indexName));
+ }
+
+ for (String indexName : List.of(indexName3, indexName4)) {
+ dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
+ }
+
+ removeIndex(catalogManager, indexId4);
+
+ CatalogManager spy = spy(catalogManager);
+
+ HybridTimestamp beginTs = clock.now();
+
+ int tableId = getTableIdStrict(catalogManager, TABLE_NAME,
clock.nowLong());
+
+ assertThat(
+ indexIdsAtRwTxBeginTs(spy, transactionId(beginTs, 1), tableId),
+ contains(
+ indexId(pkIndexName(TABLE_NAME)),
+ indexId0,
+ indexId1,
+ indexId2,
+ indexId3
+ )
+ );
+
+ verify(spy).activeCatalogVersion(eq(beginTs.longValue()));
+
verify(spy).indexes(eq(catalogManager.activeCatalogVersion(beginTs.longValue())),
eq(tableId));
+ }
+
+ @Test
+ void testDroppedTables() {
+ String tableName0 = TABLE_NAME + 0;
+ String tableName1 = TABLE_NAME + 1;
+ String tableName2 = TABLE_NAME + 2;
+
+ createSimpleTable(catalogManager, tableName0);
+ createSimpleTable(catalogManager, tableName1);
+ createSimpleTable(catalogManager, tableName2);
+
+ int tableId0 = tableId(tableName0);
+ int tableId1 = tableId(tableName1);
+
+ int catalogVersionBeforeRemoveTable1 =
catalogManager.latestCatalogVersion();
+ dropSimpleTable(catalogManager, tableName1);
+
+ int catalogVersionBeforeRemoveTable0 =
catalogManager.latestCatalogVersion();
+ dropSimpleTable(catalogManager, tableName0);
+
+ assertThat(droppedTables(null), empty());
+
assertThat(droppedTables(catalogTime(catalogVersionBeforeRemoveTable1)),
empty());
+
assertThat(droppedTables(catalogTime(catalogVersionBeforeRemoveTable1).addPhysicalTime(1)),
empty());
Review Comment:
What does this statement test?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2495,4 +2497,31 @@ public int tableId() {
return tableId;
}
}
+
+ private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
+ // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from volt and metastore
Review Comment:
```suggestion
// TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from vault and metastore
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +87,44 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of low watermark).
Review Comment:
```suggestion
* version in which the table was removed is less than or equal to the
active catalog version of the low watermark).
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +87,44 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ * @return Result is sorted by the catalog version in which the table was
removed from the catalog and by the table ID.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
+ if (lowWatermark == null) {
+ return List.of();
+ }
+
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+ Set<Integer> previousCatalogVersionTableIds = Set.of();
Review Comment:
I think we can make this code more simple:
```
Map<Integer, Integer> tableIdToCatalogVersion = new HashMap<>();
for (int catalogVersion = lwmCatalogVersion; catalogVersion >=
earliestCatalogVersion; catalogVersion--) {
for (CatalogTableDescriptor table :
catalogService.tables(catalogVersion)) {
tableIdToCatalogVersion.putIfAbsent(table.id(), catalogVersion);
}
}
return tableIdToCatalogVersion.entrySet().stream()
.map(e -> new DroppedTableInfo(e.getKey(), e.getValue()))
.collect(Collectors.toList());
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +87,44 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ * @return Result is sorted by the catalog version in which the table was
removed from the catalog and by the table ID.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
Review Comment:
How is this going to work with catalog compaction? Why don't we use the same
approach as we use in indexes, i.e. check all existing tables in the storage
and remove those which have been covered by the low watermark?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2495,4 +2497,31 @@ public int tableId() {
return tableId;
}
}
+
+ private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
Review Comment:
Do we have an integration test that covers this code? Like: create a table,
stop table manager, update low watermark, start table manager, check that the
tables are cleaned up.
##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java:
##########
@@ -525,13 +525,33 @@ void testAddStore() throws Exception {
);
}
+ @Test
+ void testDestroyGroupIfExists() throws Exception {
+ FilePageStoreManager manager = createManager();
+
+ manager.start();
+
+ int groupId = 1;
+
+ Path dbDir = workDir.resolve("db");
+
+ // Directory does not exist.
+ assertDoesNotThrow(() -> manager.destroyGroupIfExists(groupId));
+ assertThat(collectAllSubFileAndDirs(dbDir), empty());
+
+ createAndAddFilePageStore(manager, new GroupPartitionId(groupId, 0));
+ manager.destroyGroupIfExists(groupId);
+ assertThat(collectAllSubFileAndDirs(dbDir), empty());
Review Comment:
You should probably check that the directory is not empty before you call
this method
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableUtilsTest.java:
##########
@@ -35,95 +37,163 @@
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.util.List;
-import java.util.function.Consumer;
import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/** For {@link TableUtils} testing. */
public class TableUtilsTest extends IgniteAbstractTest {
private final HybridClock clock = new HybridClockImpl();
- @Test
- void testIndexIdsAtRwTxBeginTs() throws Exception {
- withCatalogManager(catalogManager -> {
- createSimpleTable(catalogManager, TABLE_NAME);
-
- String indexName0 = INDEX_NAME + 0;
- String indexName1 = INDEX_NAME + 1;
- String indexName2 = INDEX_NAME + 2;
- String indexName3 = INDEX_NAME + 3;
- String indexName4 = INDEX_NAME + 4;
-
- for (String indexName : List.of(indexName0, indexName1,
indexName2, indexName3, indexName4)) {
- createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
- }
-
- int indexId0 = indexId(catalogManager, indexName0);
- int indexId1 = indexId(catalogManager, indexName1);
- int indexId2 = indexId(catalogManager, indexName2);
- int indexId3 = indexId(catalogManager, indexName3);
- int indexId4 = indexId(catalogManager, indexName4);
-
- for (String indexName : List.of(indexName1, indexName2,
indexName3, indexName4)) {
- startBuildingIndex(catalogManager, indexId(catalogManager,
indexName));
- }
-
- for (String indexName : List.of(indexName2, indexName3,
indexName4)) {
- makeIndexAvailable(catalogManager, indexId(catalogManager,
indexName));
- }
-
- for (String indexName : List.of(indexName3, indexName4)) {
- dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
- }
-
- removeIndex(catalogManager, indexId4);
-
- CatalogManager spy = spy(catalogManager);
-
- HybridTimestamp beginTs = clock.now();
-
- int tableId = getTableIdStrict(catalogManager, TABLE_NAME,
clock.nowLong());
-
- assertThat(
- indexIdsAtRwTxBeginTs(spy, transactionId(beginTs, 1),
tableId),
- contains(
- indexId(catalogManager, pkIndexName(TABLE_NAME)),
- indexId0,
- indexId1,
- indexId2,
- indexId3
- )
- );
-
- verify(spy).activeCatalogVersion(eq(beginTs.longValue()));
-
verify(spy).indexes(eq(catalogManager.activeCatalogVersion(beginTs.longValue())),
eq(tableId));
- });
+ private final CatalogManager catalogManager =
CatalogTestUtils.createTestCatalogManager("test-node", clock);
+
+ @BeforeEach
+ void setUp() {
+ assertThat(catalogManager.start(), willCompleteSuccessfully());
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ closeAll(
+ catalogManager::beforeNodeStop,
+ catalogManager::stop
+ );
}
- private void withCatalogManager(Consumer<CatalogManager> consumer) throws
Exception {
- CatalogManager catalogManager =
CatalogTestUtils.createTestCatalogManager("test-node", clock);
+ @Test
+ void testIndexIdsAtRwTxBeginTs() {
+ createSimpleTable(catalogManager, TABLE_NAME);
- assertThat(catalogManager.start(), willCompleteSuccessfully());
+ String indexName0 = INDEX_NAME + 0;
+ String indexName1 = INDEX_NAME + 1;
+ String indexName2 = INDEX_NAME + 2;
+ String indexName3 = INDEX_NAME + 3;
+ String indexName4 = INDEX_NAME + 4;
- try {
- consumer.accept(catalogManager);
- } finally {
- closeAll(catalogManager::beforeNodeStop, catalogManager::stop);
+ for (String indexName : List.of(indexName0, indexName1, indexName2,
indexName3, indexName4)) {
+ createSimpleHashIndex(catalogManager, TABLE_NAME, indexName);
}
+
+ int indexId0 = indexId(indexName0);
+ int indexId1 = indexId(indexName1);
+ int indexId2 = indexId(indexName2);
+ int indexId3 = indexId(indexName3);
+ int indexId4 = indexId(indexName4);
+
+ for (String indexName : List.of(indexName1, indexName2, indexName3,
indexName4)) {
+ startBuildingIndex(catalogManager, indexId(indexName));
+ }
+
+ for (String indexName : List.of(indexName2, indexName3, indexName4)) {
+ makeIndexAvailable(catalogManager, indexId(indexName));
+ }
+
+ for (String indexName : List.of(indexName3, indexName4)) {
+ dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName);
+ }
+
+ removeIndex(catalogManager, indexId4);
+
+ CatalogManager spy = spy(catalogManager);
+
+ HybridTimestamp beginTs = clock.now();
+
+ int tableId = getTableIdStrict(catalogManager, TABLE_NAME,
clock.nowLong());
+
+ assertThat(
+ indexIdsAtRwTxBeginTs(spy, transactionId(beginTs, 1), tableId),
+ contains(
+ indexId(pkIndexName(TABLE_NAME)),
+ indexId0,
+ indexId1,
+ indexId2,
+ indexId3
+ )
+ );
+
+ verify(spy).activeCatalogVersion(eq(beginTs.longValue()));
+
verify(spy).indexes(eq(catalogManager.activeCatalogVersion(beginTs.longValue())),
eq(tableId));
+ }
+
+ @Test
+ void testDroppedTables() {
Review Comment:
You should also try altering a table (so that the catalog contains multiple
table descriptors for the same table ID) and check the result
##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java:
##########
@@ -105,6 +105,16 @@ public static void dropTable(CatalogManager
catalogManager, String schemaName, S
);
}
+ /**
+ * Drops table from {@link #createSimpleTable(CatalogManager, String)} in
the catalog.
+ *
+ * @param catalogManager Catalog manager.
+ * @param tableName Table name.
+ */
+ public static void dropSimpleTable(CatalogManager catalogManager, String
tableName) {
Review Comment:
Why is this method needed? Why is it called `dropSimpleTable` and not
`dropTable`? What's so "simple" about it?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableUtils.java:
##########
@@ -81,4 +87,44 @@ public static int
findStartBuildingIndexCatalogVersion(CatalogService catalogSer
BUILDING, indexId, fromCatalogVersionIncluded,
latestCatalogVersion
));
}
+
+ /**
+ * Collects a list of tables that were removed from the catalog and should
have been dropped due to a low watermark (if the catalog
+ * version in which the table was removed is less than or equal to the
active catalog version of low watermark).
+ *
+ * @param catalogService Catalog service.
+ * @param lowWatermark Low watermark, {@code null} if it has never been
updated.
+ * @return Result is sorted by the catalog version in which the table was
removed from the catalog and by the table ID.
+ */
+ // TODO: IGNITE-21771 Process or check catalog compaction
+ static List<DroppedTableInfo> droppedTables(CatalogService catalogService,
@Nullable HybridTimestamp lowWatermark) {
+ if (lowWatermark == null) {
+ return List.of();
+ }
+
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int lwmCatalogVersion =
catalogService.activeCatalogVersion(lowWatermark.longValue());
+
+ Set<Integer> previousCatalogVersionTableIds = Set.of();
+
+ var res = new ArrayList<DroppedTableInfo>();
+
+ for (int catalogVersion = earliestCatalogVersion; catalogVersion <=
lwmCatalogVersion; catalogVersion++) {
+ int finalCatalogVersion = catalogVersion;
+
+ Set<Integer> tableIds =
catalogService.tables(catalogVersion).stream()
+ .map(CatalogObjectDescriptor::id)
+ .collect(toSet());
+
+ difference(previousCatalogVersionTableIds, tableIds).stream()
+ .map(tableId -> new DroppedTableInfo(tableId,
finalCatalogVersion))
+ .forEach(res::add);
+
+ previousCatalogVersionTableIds = tableIds;
+ }
+
+
res.sort(comparingInt(DroppedTableInfo::tableRemovalCatalogVersion).thenComparing(DroppedTableInfo::tableId));
Review Comment:
Why do you need to sort this array?
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java:
##########
@@ -57,4 +57,12 @@ public interface StorageEngine {
*/
// TODO: IGNITE-19717 Get rid of indexDescriptorSupplier
MvTableStorage createMvTable(StorageTableDescriptor tableDescriptor,
StorageIndexDescriptorSupplier indexDescriptorSupplier);
+
+ /**
+ * Destroys the table on node recovery if it exists.
+ *
+ * @param tableId Table ID.
+ * @throws StorageException If an error has occurs while dropping the
table.
+ */
+ void dropMvTableOnRecovery(int tableId);
Review Comment:
I think this method's name is too specific. I would suggest to call it
`dropMvTable` or `destroyMvTable`
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2495,4 +2497,31 @@ public int tableId() {
return tableId;
}
}
+
+ private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
+ // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from volt and metastore
+ for (DroppedTableInfo droppedTableInfo : droppedTables(catalogService,
lowWatermark.getLowWatermark())) {
+ int catalogVersion = droppedTableInfo.tableRemovalCatalogVersion()
- 1;
+
+ CatalogTableDescriptor tableDescriptor =
catalogService.table(droppedTableInfo.tableId(), catalogVersion);
+
+ assert tableDescriptor != null : "tableId=" +
droppedTableInfo.tableId() + ", catalogVersion=" + catalogVersion;
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.zone(tableDescriptor.zoneId(), catalogVersion);
+
+ assert zoneDescriptor != null : "zoneId=" +
tableDescriptor.zoneId() + ", catalogVersion=" + catalogVersion;
+
+ destroyTableStorageOnRecoveryBusy(tableDescriptor, zoneDescriptor);
+ }
+ }
+
+ private void destroyTableStorageOnRecoveryBusy(CatalogTableDescriptor
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
+ StorageEngine engine =
dataStorageMgr.engine(zoneDescriptor.dataStorage().engine());
+
+ assert engine != null : "tableId=" + tableDescriptor.id() + ",
engineName=" + zoneDescriptor.dataStorage().engine();
+
+ if (!engine.isVolatile()) {
Review Comment:
I don't think we need this check at all, `dropMvTableOnRecovery` is a no-op
for volatile engines
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]