This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch catalog-feature in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 9ff6c35a55be8235c7418ab33467ca0630d92bf8 Merge: d569920d10 e6d97f5b25 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Tue Aug 29 14:05:47 2023 +0300 Merge branch 'ai-main' into catalog-feature ...l checks [PMD, modernizer, checkstyle].run.xml} | 12 +- .run/All checks [check].run.xml | 4 +- ...check].run.xml => Fast build only java.run.xml} | 8 +- .../ignite/internal/catalog/CatalogService.java | 2 - .../internal/cli/CliIntegrationTestBase.java | 7 +- .../client/handler/JdbcQueryEventHandlerImpl.java | 2 + .../requests/sql/ClientSqlExecuteRequest.java | 4 +- .../handler/JdbcQueryEventHandlerImplTest.java | 8 +- .../requests/jdbc/JdbcQueryCursorSelfTest.java | 15 +- .../internal/client/table/ClientDataStreamer.java | 5 - .../client/fakes/FakeIgniteQueryProcessor.java | 3 +- .../ignite/internal/streamer/StreamerBuffer.java | 38 +-- .../ignite/internal/streamer/StreamerOptions.java | 8 - .../internal/streamer/StreamerSubscriber.java | 53 ++-- .../org/apache/ignite/internal/util/ByteUtils.java | 2 +- .../apache/ignite/internal/util/VarIntUtils.java | 103 +++++++ .../internal/streamer/StreamerSubscriberTest.java | 179 ++++++++++++ .../ignite/internal/util/VarIntUtilsTest.java | 94 +++++++ .../testframework/TestIgnitionManager.java | 37 ++- ...butionZoneManagerLogicalTopologyEventsTest.java | 28 ++ .../DistributionZoneManagerWatchListenerTest.java | 109 -------- .../apache/ignite/raft/jraft/core/ItNodeTest.java | 2 + .../java/org/apache/ignite/internal/raft/Loza.java | 13 +- .../internal/raft/server/RaftGroupOptions.java | 23 ++ .../internal/raft/server/impl/JraftServerImpl.java | 30 +- .../internal/raft/util/OptimizedMarshaller.java | 14 +- .../ignite/internal/raft/util/OptimizedStream.java | 2 +- .../apache/ignite/raft/jraft/core/NodeImpl.java | 1 + .../ignite/raft/jraft/option/NodeOptions.java | 14 + .../jraft/rpc/impl/ActionRequestProcessor.java | 12 +- .../raft/jraft/rpc/impl/IgniteRpcServer.java | 11 +- .../impl/core/AppendEntriesRequestInterceptor.java | 40 +++ .../InterceptingAppendEntriesRequestProcessor.java | 55 ++++ .../core/NullAppendEntriesRequestInterceptor.java} | 41 +-- .../apache/ignite/raft/jraft/core/TestCluster.java | 7 + .../ignite/raft/jraft/rpc/TestIgniteRpcServer.java | 4 +- .../replicator/message/ReplicaMessageGroup.java | 5 +- .../internal/ClusterPerTestIntegrationTest.java | 2 +- .../benchmark/AbstractOneNodeBenchmark.java | 2 +- .../storage/ItRebalanceDistributedTest.java | 10 +- ...niteDistributionZoneManagerNodeRestartTest.java | 64 ++++- .../raftsnapshot/ItTableRaftSnapshotsTest.java | 79 +----- .../app/ItIgniteInMemoryNodeRestartTest.java | 2 + .../runner/app/ItIgniteNodeRestartTest.java | 14 +- .../schemasync/ItSchemaSyncAndReplicationTest.java | 177 ++++++++++++ .../internal/sql/api/ItSqlAsynchronousApiTest.java | 48 ++-- .../internal/sql/api/ItSqlSynchronousApiTest.java | 22 +- .../sql/engine/ClusterPerClassIntegrationTest.java | 7 +- .../engine/datatypes/tests/BaseDataTypeTest.java | 6 + .../src/integrationTest/sql/sqlite/join/join1.test | 4 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 16 +- .../java/org/apache/ignite/internal/Cluster.java | 88 +++++- .../ignite/internal/ReplicationGroupsUtils.java} | 41 ++- .../sql/engine/util/TestQueryProcessor.java | 13 +- .../ignite/internal/sql/api/IgniteSqlImpl.java | 9 +- .../internal/sql/api/SessionBuilderImpl.java | 8 +- .../ignite/internal/sql/api/SessionImpl.java | 12 +- .../internal/sql/engine/AsyncSqlCursorImpl.java | 21 +- .../ignite/internal/sql/engine/QueryProcessor.java | 6 +- .../sql/engine/QueryTransactionWrapper.java | 59 ++++ .../internal/sql/engine/SqlQueryProcessor.java | 303 +++++++++++++++------ .../internal/sql/engine/exec/ArrayRowHandler.java | 14 + .../internal/sql/engine/exec/ExchangeService.java | 6 +- .../sql/engine/exec/ExchangeServiceImpl.java | 11 +- .../sql/engine/exec/LogicalRelImplementor.java | 8 +- .../internal/sql/engine/exec/RowHandler.java | 43 ++- .../sql/engine/exec/exp/ExpressionFactoryImpl.java | 4 +- .../engine/exec/exp/agg/AccumulatorsFactory.java | 2 +- .../ignite/internal/sql/engine/exec/rel/Inbox.java | 16 +- .../internal/sql/engine/exec/rel/Outbox.java | 12 +- .../internal/sql/engine/externalize/RelJson.java | 4 +- .../sql/engine/message/QueryBatchMessage.java | 5 +- .../sql/engine/AsyncSqlCursorImplTest.java | 51 ++-- .../engine/QueryTransactionWrapperSelfTest.java | 131 +++++++++ .../sql/engine/exec/rel/AbstractExecutionTest.java | 7 + .../sql/engine/exec/rel/ExchangeExecutionTest.java | 3 +- .../sql/engine/exec/rel/ExecutionTest.java | 27 ++ .../internal/sql/engine/util/QueryChecker.java | 9 +- modules/table/build.gradle | 1 + .../distributed/ItTxDistributedTestSingleNode.java | 4 + .../ignite/internal/table/AbstractTableView.java | 2 +- .../apache/ignite/internal/table/DataStreamer.java | 5 - .../internal/table/distributed/TableManager.java | 38 ++- ...titionCommand.java => CatalogVersionAware.java} | 16 +- .../distributed/command/PartitionCommand.java | 8 +- .../replicator/PartitionReplicaListener.java | 222 +++++++++------ .../distributed/replicator/action/RequestType.java | 38 +-- .../schema/CheckCatalogVersionOnAppendEntries.java | 105 +++++++ .../PartitionCommandsMarshaller.java} | 23 +- .../schema/PartitionCommandsMarshallerImpl.java | 73 +++++ .../distributed/schema/SchemaSyncService.java | 8 - .../distributed/schema/SchemaSyncServiceImpl.java | 11 +- .../ThreadLocalPartitionCommandsMarshaller.java | 53 ++++ .../table/distributed/TableManagerTest.java | 3 + .../PartitionReplicaListenerIndexLockingTest.java | 9 +- .../replication/PartitionReplicaListenerTest.java | 182 +++++++++---- .../PartitionCommandsMarshallerImplTest.java | 116 ++++++++ .../schema/SchemaSyncServiceImplTest.java | 14 +- .../replicator/action/RequestTypes.java} | 95 ++++--- .../table/impl/DummyInternalTableImpl.java | 4 + 100 files changed, 2507 insertions(+), 884 deletions(-) diff --cc modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index a653bfe821,928a08df45..fedc0bfd0b --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@@ -765,18 -770,23 +767,23 @@@ public class ItRebalanceDistributedTes metaStorageManager, clusterService); - clockWaiter = new ClockWaiter("test", hybridClock); + clockWaiter = new ClockWaiter(name, hybridClock); + LongSupplier delayDurationMsSupplier = () -> 10L; + catalogManager = new CatalogManagerImpl( new UpdateLogImpl(metaStorageManager), - clockWaiter + clockWaiter, + delayDurationMsSupplier ); schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager); + var schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), delayDurationMsSupplier); + distributionZoneManager = new DistributionZoneManager( + name, registry, - zonesCfg, tablesCfg, metaStorageManager, logicalTopologyService, diff --cc modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java index 8e35cff82f,c0d289fd47..499079f797 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java @@@ -17,11 -17,12 +17,12 @@@ package org.apache.ignite.internal.distribution.zones; + import static java.util.Collections.emptySet; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME; -import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE; -import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey; diff --cc modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 4bd43933d4,80ec993d48..4ec6ec43a9 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@@ -337,20 -344,31 +339,28 @@@ public class ItIgniteNodeRestartTest ex SchemaManager schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr); - Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater = (LongFunction<CompletableFuture<?>> function) -> - metaStorageMgr.registerRevisionUpdateListener(function::apply); - - DistributionZoneManager distributionZoneManager = new DistributionZoneManager( - null, - zonesConfig, - tablesConfig, - metaStorageMgr, - logicalTopologyService, - vault, - name - ); - - var clockWaiter = new ClockWaiter("test", hybridClock); + var clockWaiter = new ClockWaiter(name, clock); - var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageMgr), clockWaiter); + LongSupplier delayDurationMsSupplier = () -> 100L; + + var catalogManager = new CatalogManagerImpl( + new UpdateLogImpl(metaStorageMgr), + clockWaiter, + delayDurationMsSupplier + ); + DistributionZoneManager distributionZoneManager = new DistributionZoneManager( + name, + registry, + tablesConfig, + metaStorageMgr, + logicalTopologyService, + vault, + catalogManager + ); + + var schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier); + TableManager tableManager = new TableManager( name, registry, @@@ -389,10 -409,10 +400,9 @@@ indexManager, schemaManager, dataStorageManager, - txManager, - distributionZoneManager, () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()), replicaSvc, - hybridClock, + clock, catalogManager, metricManager ); diff --cc modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 1cfffb1357,44cf8a6945..108ddcdda1 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@@ -503,19 -522,13 +509,23 @@@ public class IgniteImpl implements Igni catalogManager = new CatalogManagerImpl( new UpdateLogImpl(metaStorageMgr), clockWaiter, - () -> schemaSyncConfig.delayDuration().value() + delayDurationMsSupplier ); + raftMgr.appendEntriesRequestInterceptor(new CheckCatalogVersionOnAppendEntries(catalogManager)); + + SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier); + + distributionZoneManager = new DistributionZoneManager( + name, + registry, + tablesConfig, + metaStorageMgr, + logicalTopologyService, + vaultMgr, + catalogManager + ); + distributedTblMgr = new TableManager( name, registry, @@@ -552,7 -567,7 +563,6 @@@ indexManager, schemaManager, dataStorageMgr, - txManager, - distributionZoneManager, () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()), replicaSvc, clock, diff --cc modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index bf60f6439a,33445d9aac..ac63f66327 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@@ -67,10 -68,11 +66,11 @@@ import org.apache.ignite.internal.sql.e import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl; import org.apache.ignite.internal.sql.engine.prepare.PrepareService; import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl; + import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; import org.apache.ignite.internal.sql.engine.property.PropertiesHelper; import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; +import org.apache.ignite.internal.sql.engine.schema.CatalogSqlSchemaManager; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; -import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl; import org.apache.ignite.internal.sql.engine.session.Session; import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.sql.engine.session.SessionInfo; @@@ -87,8 -89,8 +87,7 @@@ import org.apache.ignite.internal.sql.m import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.event.TableEvent; -import org.apache.ignite.internal.table.event.TableEventParameters; import org.apache.ignite.internal.tx.InternalTransaction; - import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteInternalException; @@@ -170,9 -173,9 +171,6 @@@ public class SqlQueryProcessor implemen private volatile SqlSchemaManager sqlSchemaManager; - /** Transaction manager. */ - private final TxManager txManager; - /** Distribution zones manager. */ - private final DistributionZoneManager distributionZoneManager; -- /** Clock. */ private final HybridClock clock; @@@ -193,7 -196,7 +191,6 @@@ IndexManager indexManager, SchemaManager schemaManager, DataStorageManager dataStorageManager, - TxManager txManager, - DistributionZoneManager distributionZoneManager, Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier, ReplicaService replicaService, HybridClock clock, @@@ -205,7 -209,7 +202,6 @@@ this.indexManager = indexManager; this.schemaManager = schemaManager; this.dataStorageManager = dataStorageManager; - this.txManager = txManager; - this.distributionZoneManager = distributionZoneManager; this.dataStorageFieldsSupplier = dataStorageFieldsSupplier; this.replicaService = replicaService; this.clock = clock; diff --cc modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index dd7a55a346,de1d6766bc..a0c7b44a88 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@@ -367,11 -381,8 +374,11 @@@ public class TableManager extends Produ private final ConfiguredTablesCache configuredTablesCache; - private final CatalogManager catalogManager; + private final Marshaller raftCommandsMarshaller; + /** Versioned value used only at manager startup to correctly fire table creation events. */ + private final IncrementalVersionedValue<Void> startVv; + /** * Creates a new table manager. * @@@ -417,9 -429,11 +423,10 @@@ VaultManager vaultManager, ClusterManagementGroupManager cmgMgr, DistributionZoneManager distributionZoneManager, - CatalogManager catalogManager + SchemaSyncService schemaSyncService, + CatalogService catalogService ) { this.tablesCfg = tablesCfg; - this.zonesConfig = zonesConfig; this.gcConfig = gcConfig; this.clusterService = clusterService; this.raftMgr = raftMgr; @@@ -496,7 -511,7 +504,9 @@@ configuredTablesCache = new ConfiguredTablesCache(tablesCfg, getMetadataLocallyOnly); + raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); ++ + startVv = new IncrementalVersionedValue<>(registry); } @Override @@@ -597,9 -610,7 +607,9 @@@ try { CatalogTableDescriptor tableDescriptor = toTableDescriptor(ctx.newValue()); - CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor.zoneId()); + // TODO: IGNITE-19499 It is now safe to get the latest version of the catalog since we are in the metasore thread, we need to + // change to the version from the catalog listener - CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor.zoneId(), catalogManager.latestCatalogVersion()); ++ CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor.zoneId(), catalogService.latestCatalogVersion()); CompletableFuture<List<Set<Assignment>>> assignmentsFuture; @@@ -682,9 -695,7 +692,9 @@@ try { CatalogTableDescriptor tableDescriptor = toTableDescriptor(ctx.oldValue()); - CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor.zoneId()); + // TODO: IGNITE-19499 It is now safe to get the latest version of the catalog since we are in the metasore thread, we need to + // change to the version from the catalog listener - CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor.zoneId(), catalogManager.latestCatalogVersion()); ++ CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor.zoneId(), catalogService.latestCatalogVersion()); dropTableLocally(ctx.storageRevision(), tableDescriptor, zoneDescriptor); } finally { @@@ -1304,7 -1321,7 +1318,7 @@@ MvTableStorage tableStorage = engine.createMvTable( new StorageTableDescriptor(tableDescriptor.id(), zoneDescriptor.partitions(), dataStorage.dataRegion()), - new StorageIndexDescriptorSupplier(catalogManager) - new StorageIndexDescriptorSupplier(tablesCfg) ++ new StorageIndexDescriptorSupplier(catalogService) ); tableStorage.start(); @@@ -1456,7 -1473,7 +1470,7 @@@ // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 we must use distribution zone keys here baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), tablePartitionId.partitionId(), - getZoneDescriptor(tableDescriptor.zoneId(), catalogManager.latestCatalogVersion()).replicas() - getZoneDescriptor(tableDescriptor.zoneId()).replicas() ++ getZoneDescriptor(tableDescriptor.zoneId(), catalogService.latestCatalogVersion()).replicas() ); } @@@ -1509,44 -1526,55 +1523,44 @@@ } try { - distributionZoneManager.zoneIdAsyncInternal(zoneName).handle((zoneId, zoneIdEx) -> { - if (zoneId == null) { - tblFut.completeExceptionally(new DistributionZoneNotFoundException(zoneName)); - } else if (zoneIdEx != null) { - tblFut.completeExceptionally(zoneIdEx); - } else { - if (!busyLock.enterBusy()) { - NodeStoppingException nodeStoppingException = new NodeStoppingException(); + // TODO: IGNITE-19499 Should listen to event CreateTableEventParameters and get the zone ID from it - CatalogZoneDescriptor zoneDescriptor = catalogManager.zone(zoneName, clock.nowLong()); ++ CatalogZoneDescriptor zoneDescriptor = catalogService.zone(zoneName, clock.nowLong()); - tblFut.completeExceptionally(nodeStoppingException); + if (zoneDescriptor == null) { + tblFut.completeExceptionally(new DistributionZoneNotFoundException(zoneName)); - throw new IgniteException(nodeStoppingException); - } + return null; + } - try { - cmgMgr.logicalTopology() - .handle((cmgTopology, e) -> { - if (e == null) { - if (!busyLock.enterBusy()) { - NodeStoppingException nodeStoppingException = new NodeStoppingException(); - - tblFut.completeExceptionally(nodeStoppingException); - - throw new IgniteException(nodeStoppingException); - } - - try { - changeTablesConfigurationOnTableCreate( - name, - zoneId, - tableInitChange, - tblFut - ); - } finally { - busyLock.leaveBusy(); - } - } else { - tblFut.completeExceptionally(e); - } - - return null; - }); - } finally { - busyLock.leaveBusy(); - } - } + cmgMgr.logicalTopology() + .handle((cmgTopology, e) -> { + if (e == null) { + if (!busyLock.enterBusy()) { + NodeStoppingException nodeStoppingException = new NodeStoppingException(); + + tblFut.completeExceptionally(nodeStoppingException); + + throw new IgniteException(nodeStoppingException); + } + + try { + changeTablesConfigurationOnTableCreate( + name, + zoneDescriptor.id(), + tableInitChange, + tblFut + ); + } finally { + busyLock.leaveBusy(); + } + } else { + tblFut.completeExceptionally(e); + } - return null; - }); + return null; + }); + } catch (Throwable t) { + tblFut.completeExceptionally(t); } finally { busyLock.leaveBusy(); } @@@ -2417,9 -2434,6 +2431,9 @@@ TablePartitionId replicaGrpId = new TablePartitionId(tableId, partitionId); + // It's safe to get the latest version of the catalog since we're in the metastore thread. - int catalogVersion = catalogManager.latestCatalogVersion(); ++ int catalogVersion = catalogService.latestCatalogVersion(); + return tablesById(evt.revision()) .thenCompose(tables -> { if (!busyLock.enterBusy()) { @@@ -2730,42 -2733,7 +2744,42 @@@ return tableView == null ? null : toTableDescriptor(tableView); } - private CatalogZoneDescriptor getZoneDescriptor(int id) { - return toZoneDescriptor(getZoneById(zonesConfig, id).value()); + private @Nullable CatalogZoneDescriptor getZoneDescriptor(int zoneId, int catalogVersion) { - return catalogManager.zone(zoneId, catalogVersion); ++ return catalogService.zone(zoneId, catalogVersion); + } + + private static @Nullable TableImpl findTableImplByName(Collection<TableImpl> tables, String name) { + return tables.stream().filter(table -> table.name().equals(name)).findAny().orElse(null); + } + + /** + * Fires table creation events so that indexes can be correctly created at IndexManager startup. + * + * <p>NOTE: This is a temporary solution that must be get rid/remake/change. + */ + // TODO: IGNITE-19499 Need to get rid/remake/change + private void fireCreateTablesOnManagerStart() { + CompletableFuture<Long> recoveryFinishFuture = metaStorageMgr.recoveryFinishedFuture(); + + assert recoveryFinishFuture.isDone(); + + long causalityToken = recoveryFinishFuture.join(); + + List<CompletableFuture<?>> fireEventFutures = new ArrayList<>(); + + for (TableView tableView : tablesCfg.tables().value()) { + fireEventFutures.add(fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableView.id()))); + } + + startVv.update(causalityToken, (unused, throwable) -> allOf(fireEventFutures.toArray(CompletableFuture[]::new))) + .whenComplete((unused, throwable) -> { + if (throwable != null) { + LOG.error("Error when firing table creation events at manager start", throwable); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Manager successfully fired table creation events at manager start"); + } + } + }); } } diff --cc modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 31193447d9,e41f562587..92f92345af --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@@ -62,8 -64,8 +62,9 @@@ import java.util.function.LongFunction import org.apache.ignite.configuration.NamedListView; import org.apache.ignite.internal.affinity.AffinityUtils; import org.apache.ignite.internal.baseline.BaselineManager; + import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.CatalogTestUtils; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; @@@ -828,7 -833,8 +830,8 @@@ public class TableManagerTest extends I vaultManager, cmgMgr, distributionZoneManager, + mock(SchemaSyncService.class), - mock(CatalogService.class) + catalogManager ) { @Override diff --cc modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java index 8aadeae88c,d0c250122f..d0dd6dcb35 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java @@@ -17,8 -17,9 +17,9 @@@ package org.apache.ignite.internal.table.distributed.replication; + import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_PARTITION_COUNT; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf;