This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch catalog-feature
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 9ff6c35a55be8235c7418ab33467ca0630d92bf8
Merge: d569920d10 e6d97f5b25
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Tue Aug 29 14:05:47 2023 +0300

    Merge branch 'ai-main' into catalog-feature

 ...l checks [PMD, modernizer, checkstyle].run.xml} |  12 +-
 .run/All checks [check].run.xml                    |   4 +-
 ...check].run.xml => Fast build only java.run.xml} |   8 +-
 .../ignite/internal/catalog/CatalogService.java    |   2 -
 .../internal/cli/CliIntegrationTestBase.java       |   7 +-
 .../client/handler/JdbcQueryEventHandlerImpl.java  |   2 +
 .../requests/sql/ClientSqlExecuteRequest.java      |   4 +-
 .../handler/JdbcQueryEventHandlerImplTest.java     |   8 +-
 .../requests/jdbc/JdbcQueryCursorSelfTest.java     |  15 +-
 .../internal/client/table/ClientDataStreamer.java  |   5 -
 .../client/fakes/FakeIgniteQueryProcessor.java     |   3 +-
 .../ignite/internal/streamer/StreamerBuffer.java   |  38 +--
 .../ignite/internal/streamer/StreamerOptions.java  |   8 -
 .../internal/streamer/StreamerSubscriber.java      |  53 ++--
 .../org/apache/ignite/internal/util/ByteUtils.java |   2 +-
 .../apache/ignite/internal/util/VarIntUtils.java   | 103 +++++++
 .../internal/streamer/StreamerSubscriberTest.java  | 179 ++++++++++++
 .../ignite/internal/util/VarIntUtilsTest.java      |  94 +++++++
 .../testframework/TestIgnitionManager.java         |  37 ++-
 ...butionZoneManagerLogicalTopologyEventsTest.java |  28 ++
 .../DistributionZoneManagerWatchListenerTest.java  | 109 --------
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |   2 +
 .../java/org/apache/ignite/internal/raft/Loza.java |  13 +-
 .../internal/raft/server/RaftGroupOptions.java     |  23 ++
 .../internal/raft/server/impl/JraftServerImpl.java |  30 +-
 .../internal/raft/util/OptimizedMarshaller.java    |  14 +-
 .../ignite/internal/raft/util/OptimizedStream.java |   2 +-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |   1 +
 .../ignite/raft/jraft/option/NodeOptions.java      |  14 +
 .../jraft/rpc/impl/ActionRequestProcessor.java     |  12 +-
 .../raft/jraft/rpc/impl/IgniteRpcServer.java       |  11 +-
 .../impl/core/AppendEntriesRequestInterceptor.java |  40 +++
 .../InterceptingAppendEntriesRequestProcessor.java |  55 ++++
 .../core/NullAppendEntriesRequestInterceptor.java} |  41 +--
 .../apache/ignite/raft/jraft/core/TestCluster.java |   7 +
 .../ignite/raft/jraft/rpc/TestIgniteRpcServer.java |   4 +-
 .../replicator/message/ReplicaMessageGroup.java    |   5 +-
 .../internal/ClusterPerTestIntegrationTest.java    |   2 +-
 .../benchmark/AbstractOneNodeBenchmark.java        |   2 +-
 .../storage/ItRebalanceDistributedTest.java        |  10 +-
 ...niteDistributionZoneManagerNodeRestartTest.java |  64 ++++-
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |  79 +-----
 .../app/ItIgniteInMemoryNodeRestartTest.java       |   2 +
 .../runner/app/ItIgniteNodeRestartTest.java        |  14 +-
 .../schemasync/ItSchemaSyncAndReplicationTest.java | 177 ++++++++++++
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  48 ++--
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |  22 +-
 .../sql/engine/ClusterPerClassIntegrationTest.java |   7 +-
 .../engine/datatypes/tests/BaseDataTypeTest.java   |   6 +
 .../src/integrationTest/sql/sqlite/join/join1.test |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  16 +-
 .../java/org/apache/ignite/internal/Cluster.java   |  88 +++++-
 .../ignite/internal/ReplicationGroupsUtils.java}   |  41 ++-
 .../sql/engine/util/TestQueryProcessor.java        |  13 +-
 .../ignite/internal/sql/api/IgniteSqlImpl.java     |   9 +-
 .../internal/sql/api/SessionBuilderImpl.java       |   8 +-
 .../ignite/internal/sql/api/SessionImpl.java       |  12 +-
 .../internal/sql/engine/AsyncSqlCursorImpl.java    |  21 +-
 .../ignite/internal/sql/engine/QueryProcessor.java |   6 +-
 .../sql/engine/QueryTransactionWrapper.java        |  59 ++++
 .../internal/sql/engine/SqlQueryProcessor.java     | 303 +++++++++++++++------
 .../internal/sql/engine/exec/ArrayRowHandler.java  |  14 +
 .../internal/sql/engine/exec/ExchangeService.java  |   6 +-
 .../sql/engine/exec/ExchangeServiceImpl.java       |  11 +-
 .../sql/engine/exec/LogicalRelImplementor.java     |   8 +-
 .../internal/sql/engine/exec/RowHandler.java       |  43 ++-
 .../sql/engine/exec/exp/ExpressionFactoryImpl.java |   4 +-
 .../engine/exec/exp/agg/AccumulatorsFactory.java   |   2 +-
 .../ignite/internal/sql/engine/exec/rel/Inbox.java |  16 +-
 .../internal/sql/engine/exec/rel/Outbox.java       |  12 +-
 .../internal/sql/engine/externalize/RelJson.java   |   4 +-
 .../sql/engine/message/QueryBatchMessage.java      |   5 +-
 .../sql/engine/AsyncSqlCursorImplTest.java         |  51 ++--
 .../engine/QueryTransactionWrapperSelfTest.java    | 131 +++++++++
 .../sql/engine/exec/rel/AbstractExecutionTest.java |   7 +
 .../sql/engine/exec/rel/ExchangeExecutionTest.java |   3 +-
 .../sql/engine/exec/rel/ExecutionTest.java         |  27 ++
 .../internal/sql/engine/util/QueryChecker.java     |   9 +-
 modules/table/build.gradle                         |   1 +
 .../distributed/ItTxDistributedTestSingleNode.java |   4 +
 .../ignite/internal/table/AbstractTableView.java   |   2 +-
 .../apache/ignite/internal/table/DataStreamer.java |   5 -
 .../internal/table/distributed/TableManager.java   |  38 ++-
 ...titionCommand.java => CatalogVersionAware.java} |  16 +-
 .../distributed/command/PartitionCommand.java      |   8 +-
 .../replicator/PartitionReplicaListener.java       | 222 +++++++++------
 .../distributed/replicator/action/RequestType.java |  38 +--
 .../schema/CheckCatalogVersionOnAppendEntries.java | 105 +++++++
 .../PartitionCommandsMarshaller.java}              |  23 +-
 .../schema/PartitionCommandsMarshallerImpl.java    |  73 +++++
 .../distributed/schema/SchemaSyncService.java      |   8 -
 .../distributed/schema/SchemaSyncServiceImpl.java  |  11 +-
 .../ThreadLocalPartitionCommandsMarshaller.java    |  53 ++++
 .../table/distributed/TableManagerTest.java        |   3 +
 .../PartitionReplicaListenerIndexLockingTest.java  |   9 +-
 .../replication/PartitionReplicaListenerTest.java  | 182 +++++++++----
 .../PartitionCommandsMarshallerImplTest.java       | 116 ++++++++
 .../schema/SchemaSyncServiceImplTest.java          |  14 +-
 .../replicator/action/RequestTypes.java}           |  95 ++++---
 .../table/impl/DummyInternalTableImpl.java         |   4 +
 100 files changed, 2507 insertions(+), 884 deletions(-)

diff --cc 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a653bfe821,928a08df45..fedc0bfd0b
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@@ -765,18 -770,23 +767,23 @@@ public class ItRebalanceDistributedTes
                      metaStorageManager,
                      clusterService);
  
 -            clockWaiter = new ClockWaiter("test", hybridClock);
 +            clockWaiter = new ClockWaiter(name, hybridClock);
  
+             LongSupplier delayDurationMsSupplier = () -> 10L;
+ 
              catalogManager = new CatalogManagerImpl(
                      new UpdateLogImpl(metaStorageManager),
-                     clockWaiter
+                     clockWaiter,
+                     delayDurationMsSupplier
              );
  
              schemaManager = new SchemaManager(registry, tablesCfg, 
metaStorageManager);
  
+             var schemaSyncService = new 
SchemaSyncServiceImpl(metaStorageManager.clusterTime(), 
delayDurationMsSupplier);
+ 
              distributionZoneManager = new DistributionZoneManager(
 +                    name,
                      registry,
 -                    zonesCfg,
                      tablesCfg,
                      metaStorageManager,
                      logicalTopologyService,
diff --cc 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 8e35cff82f,c0d289fd47..499079f797
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@@ -17,11 -17,12 +17,12 @@@
  
  package org.apache.ignite.internal.distribution.zones;
  
+ import static java.util.Collections.emptySet;
  import static java.util.concurrent.CompletableFuture.completedFuture;
  import static java.util.stream.Collectors.toSet;
 -import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 -import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
 -import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
 +import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
 +import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
 +import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
  import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
  import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
  import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
diff --cc 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 4bd43933d4,80ec993d48..4ec6ec43a9
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@@ -337,20 -344,31 +339,28 @@@ public class ItIgniteNodeRestartTest ex
  
          SchemaManager schemaManager = new SchemaManager(registry, 
tablesConfig, metaStorageMgr);
  
 -        Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater = 
(LongFunction<CompletableFuture<?>> function) ->
 -                
metaStorageMgr.registerRevisionUpdateListener(function::apply);
 -
 -        DistributionZoneManager distributionZoneManager = new 
DistributionZoneManager(
 -                null,
 -                zonesConfig,
 -                tablesConfig,
 -                metaStorageMgr,
 -                logicalTopologyService,
 -                vault,
 -                name
 -        );
 -
 -        var clockWaiter = new ClockWaiter("test", hybridClock);
 +        var clockWaiter = new ClockWaiter(name, clock);
  
-         var catalogManager = new CatalogManagerImpl(new 
UpdateLogImpl(metaStorageMgr), clockWaiter);
+         LongSupplier delayDurationMsSupplier = () -> 100L;
+ 
+         var catalogManager = new CatalogManagerImpl(
+                 new UpdateLogImpl(metaStorageMgr),
+                 clockWaiter,
+                 delayDurationMsSupplier
+         );
  
 +        DistributionZoneManager distributionZoneManager = new 
DistributionZoneManager(
 +                name,
 +                registry,
 +                tablesConfig,
 +                metaStorageMgr,
 +                logicalTopologyService,
 +                vault,
 +                catalogManager
 +        );
 +
+         var schemaSyncService = new 
SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
+ 
          TableManager tableManager = new TableManager(
                  name,
                  registry,
@@@ -389,10 -409,10 +400,9 @@@
                  indexManager,
                  schemaManager,
                  dataStorageManager,
-                 txManager,
 -                distributionZoneManager,
                  () -> 
dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
                  replicaSvc,
 -                hybridClock,
 +                clock,
                  catalogManager,
                  metricManager
          );
diff --cc 
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1cfffb1357,44cf8a6945..108ddcdda1
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@@ -503,19 -522,13 +509,23 @@@ public class IgniteImpl implements Igni
          catalogManager = new CatalogManagerImpl(
                  new UpdateLogImpl(metaStorageMgr),
                  clockWaiter,
-                 () -> schemaSyncConfig.delayDuration().value()
+                 delayDurationMsSupplier
          );
  
+         raftMgr.appendEntriesRequestInterceptor(new 
CheckCatalogVersionOnAppendEntries(catalogManager));
+ 
+         SchemaSyncService schemaSyncService = new 
SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), delayDurationMsSupplier);
+ 
 +        distributionZoneManager = new DistributionZoneManager(
 +                name,
 +                registry,
 +                tablesConfig,
 +                metaStorageMgr,
 +                logicalTopologyService,
 +                vaultMgr,
 +                catalogManager
 +        );
 +
          distributedTblMgr = new TableManager(
                  name,
                  registry,
@@@ -552,7 -567,7 +563,6 @@@
                  indexManager,
                  schemaManager,
                  dataStorageMgr,
-                 txManager,
 -                distributionZoneManager,
                  () -> 
dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
                  replicaSvc,
                  clock,
diff --cc 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index bf60f6439a,33445d9aac..ac63f66327
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@@ -67,10 -68,11 +66,11 @@@ import org.apache.ignite.internal.sql.e
  import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
  import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
  import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+ import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
  import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
  import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 +import org.apache.ignite.internal.sql.engine.schema.CatalogSqlSchemaManager;
  import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 -import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
  import org.apache.ignite.internal.sql.engine.session.Session;
  import org.apache.ignite.internal.sql.engine.session.SessionId;
  import org.apache.ignite.internal.sql.engine.session.SessionInfo;
@@@ -87,8 -89,8 +87,7 @@@ import org.apache.ignite.internal.sql.m
  import org.apache.ignite.internal.storage.DataStorageManager;
  import org.apache.ignite.internal.table.distributed.TableManager;
  import org.apache.ignite.internal.table.event.TableEvent;
 -import org.apache.ignite.internal.table.event.TableEventParameters;
  import org.apache.ignite.internal.tx.InternalTransaction;
- import org.apache.ignite.internal.tx.TxManager;
  import org.apache.ignite.internal.util.IgniteSpinBusyLock;
  import org.apache.ignite.internal.util.IgniteUtils;
  import org.apache.ignite.lang.IgniteInternalException;
@@@ -170,9 -173,9 +171,6 @@@ public class SqlQueryProcessor implemen
  
      private volatile SqlSchemaManager sqlSchemaManager;
  
-     /** Transaction manager. */
-     private final TxManager txManager;
 -    /** Distribution zones manager. */
 -    private final DistributionZoneManager distributionZoneManager;
--
      /** Clock. */
      private final HybridClock clock;
  
@@@ -193,7 -196,7 +191,6 @@@
              IndexManager indexManager,
              SchemaManager schemaManager,
              DataStorageManager dataStorageManager,
-             TxManager txManager,
 -            DistributionZoneManager distributionZoneManager,
              Supplier<Map<String, Map<String, Class<?>>>> 
dataStorageFieldsSupplier,
              ReplicaService replicaService,
              HybridClock clock,
@@@ -205,7 -209,7 +202,6 @@@
          this.indexManager = indexManager;
          this.schemaManager = schemaManager;
          this.dataStorageManager = dataStorageManager;
-         this.txManager = txManager;
 -        this.distributionZoneManager = distributionZoneManager;
          this.dataStorageFieldsSupplier = dataStorageFieldsSupplier;
          this.replicaService = replicaService;
          this.clock = clock;
diff --cc 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index dd7a55a346,de1d6766bc..a0c7b44a88
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@@ -367,11 -381,8 +374,11 @@@ public class TableManager extends Produ
  
      private final ConfiguredTablesCache configuredTablesCache;
  
-     private final CatalogManager catalogManager;
+     private final Marshaller raftCommandsMarshaller;
  
 +    /** Versioned value used only at manager startup to correctly fire table 
creation events. */
 +    private final IncrementalVersionedValue<Void> startVv;
 +
      /**
       * Creates a new table manager.
       *
@@@ -417,9 -429,11 +423,10 @@@
              VaultManager vaultManager,
              ClusterManagementGroupManager cmgMgr,
              DistributionZoneManager distributionZoneManager,
-             CatalogManager catalogManager
+             SchemaSyncService schemaSyncService,
+             CatalogService catalogService
      ) {
          this.tablesCfg = tablesCfg;
 -        this.zonesConfig = zonesConfig;
          this.gcConfig = gcConfig;
          this.clusterService = clusterService;
          this.raftMgr = raftMgr;
@@@ -496,7 -511,7 +504,9 @@@
  
          configuredTablesCache = new ConfiguredTablesCache(tablesCfg, 
getMetadataLocallyOnly);
  
+         raftCommandsMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
++
 +        startVv = new IncrementalVersionedValue<>(registry);
      }
  
      @Override
@@@ -597,9 -610,7 +607,9 @@@
  
          try {
              CatalogTableDescriptor tableDescriptor = 
toTableDescriptor(ctx.newValue());
 -            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor.zoneId());
 +            // TODO: IGNITE-19499 It is now safe to get the latest version of 
the catalog since we are in the metasore thread, we need to
 +            //  change to the version from the catalog listener
-             CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor.zoneId(), 
catalogManager.latestCatalogVersion());
++            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor.zoneId(), 
catalogService.latestCatalogVersion());
  
              CompletableFuture<List<Set<Assignment>>> assignmentsFuture;
  
@@@ -682,9 -695,7 +692,9 @@@
  
          try {
              CatalogTableDescriptor tableDescriptor = 
toTableDescriptor(ctx.oldValue());
 -            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor.zoneId());
 +            // TODO: IGNITE-19499 It is now safe to get the latest version of 
the catalog since we are in the metasore thread, we need to
 +            //  change to the version from the catalog listener
-             CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor.zoneId(), 
catalogManager.latestCatalogVersion());
++            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor.zoneId(), 
catalogService.latestCatalogVersion());
  
              dropTableLocally(ctx.storageRevision(), tableDescriptor, 
zoneDescriptor);
          } finally {
@@@ -1304,7 -1321,7 +1318,7 @@@
  
          MvTableStorage tableStorage = engine.createMvTable(
                  new StorageTableDescriptor(tableDescriptor.id(), 
zoneDescriptor.partitions(), dataStorage.dataRegion()),
-                 new StorageIndexDescriptorSupplier(catalogManager)
 -                new StorageIndexDescriptorSupplier(tablesCfg)
++                new StorageIndexDescriptorSupplier(catalogService)
          );
  
          tableStorage.start();
@@@ -1456,7 -1473,7 +1470,7 @@@
                  // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 
we must use distribution zone keys here
                  
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
                  tablePartitionId.partitionId(),
-                 getZoneDescriptor(tableDescriptor.zoneId(), 
catalogManager.latestCatalogVersion()).replicas()
 -                getZoneDescriptor(tableDescriptor.zoneId()).replicas()
++                getZoneDescriptor(tableDescriptor.zoneId(), 
catalogService.latestCatalogVersion()).replicas()
          );
      }
  
@@@ -1509,44 -1526,55 +1523,44 @@@
                          }
  
                          try {
 -                            
distributionZoneManager.zoneIdAsyncInternal(zoneName).handle((zoneId, zoneIdEx) 
-> {
 -                                if (zoneId == null) {
 -                                    tblFut.completeExceptionally(new 
DistributionZoneNotFoundException(zoneName));
 -                                } else if (zoneIdEx != null) {
 -                                    tblFut.completeExceptionally(zoneIdEx);
 -                                } else {
 -                                    if (!busyLock.enterBusy()) {
 -                                        NodeStoppingException 
nodeStoppingException = new NodeStoppingException();
 +                            // TODO: IGNITE-19499 Should listen to event 
CreateTableEventParameters and get the zone ID from it
-                             CatalogZoneDescriptor zoneDescriptor = 
catalogManager.zone(zoneName, clock.nowLong());
++                            CatalogZoneDescriptor zoneDescriptor = 
catalogService.zone(zoneName, clock.nowLong());
  
 -                                        
tblFut.completeExceptionally(nodeStoppingException);
 +                            if (zoneDescriptor == null) {
 +                                tblFut.completeExceptionally(new 
DistributionZoneNotFoundException(zoneName));
  
 -                                        throw new 
IgniteException(nodeStoppingException);
 -                                    }
 +                                return null;
 +                            }
  
 -                                    try {
 -                                        cmgMgr.logicalTopology()
 -                                                .handle((cmgTopology, e) -> {
 -                                                    if (e == null) {
 -                                                        if 
(!busyLock.enterBusy()) {
 -                                                            
NodeStoppingException nodeStoppingException = new NodeStoppingException();
 -
 -                                                            
tblFut.completeExceptionally(nodeStoppingException);
 -
 -                                                            throw new 
IgniteException(nodeStoppingException);
 -                                                        }
 -
 -                                                        try {
 -                                                            
changeTablesConfigurationOnTableCreate(
 -                                                                    name,
 -                                                                    zoneId,
 -                                                                    
tableInitChange,
 -                                                                    tblFut
 -                                                            );
 -                                                        } finally {
 -                                                            
busyLock.leaveBusy();
 -                                                        }
 -                                                    } else {
 -                                                        
tblFut.completeExceptionally(e);
 -                                                    }
 -
 -                                                    return null;
 -                                                });
 -                                    } finally {
 -                                        busyLock.leaveBusy();
 -                                    }
 -                                }
 +                            cmgMgr.logicalTopology()
 +                                    .handle((cmgTopology, e) -> {
 +                                        if (e == null) {
 +                                            if (!busyLock.enterBusy()) {
 +                                                NodeStoppingException 
nodeStoppingException = new NodeStoppingException();
 +
 +                                                
tblFut.completeExceptionally(nodeStoppingException);
 +
 +                                                throw new 
IgniteException(nodeStoppingException);
 +                                            }
 +
 +                                            try {
 +                                                
changeTablesConfigurationOnTableCreate(
 +                                                        name,
 +                                                        zoneDescriptor.id(),
 +                                                        tableInitChange,
 +                                                        tblFut
 +                                                );
 +                                            } finally {
 +                                                busyLock.leaveBusy();
 +                                            }
 +                                        } else {
 +                                            tblFut.completeExceptionally(e);
 +                                        }
  
 -                                return null;
 -                            });
 +                                        return null;
 +                                    });
 +                        } catch (Throwable t) {
 +                            tblFut.completeExceptionally(t);
                          } finally {
                              busyLock.leaveBusy();
                          }
@@@ -2417,9 -2434,6 +2431,9 @@@
  
                      TablePartitionId replicaGrpId = new 
TablePartitionId(tableId, partitionId);
  
 +                    // It's safe to get the latest version of the catalog 
since we're in the metastore thread.
-                     int catalogVersion = 
catalogManager.latestCatalogVersion();
++                    int catalogVersion = 
catalogService.latestCatalogVersion();
 +
                      return tablesById(evt.revision())
                              .thenCompose(tables -> {
                                  if (!busyLock.enterBusy()) {
@@@ -2730,42 -2733,7 +2744,42 @@@
          return tableView == null ? null : toTableDescriptor(tableView);
      }
  
 -    private CatalogZoneDescriptor getZoneDescriptor(int id) {
 -        return toZoneDescriptor(getZoneById(zonesConfig, id).value());
 +    private @Nullable CatalogZoneDescriptor getZoneDescriptor(int zoneId, int 
catalogVersion) {
-         return catalogManager.zone(zoneId, catalogVersion);
++        return catalogService.zone(zoneId, catalogVersion);
 +    }
 +
 +    private static @Nullable TableImpl 
findTableImplByName(Collection<TableImpl> tables, String name) {
 +        return tables.stream().filter(table -> 
table.name().equals(name)).findAny().orElse(null);
 +    }
 +
 +    /**
 +     * Fires table creation events so that indexes can be correctly created 
at IndexManager startup.
 +     *
 +     * <p>NOTE: This is a temporary solution that must be get 
rid/remake/change.
 +     */
 +    // TODO: IGNITE-19499 Need to get rid/remake/change
 +    private void fireCreateTablesOnManagerStart() {
 +        CompletableFuture<Long> recoveryFinishFuture = 
metaStorageMgr.recoveryFinishedFuture();
 +
 +        assert recoveryFinishFuture.isDone();
 +
 +        long causalityToken = recoveryFinishFuture.join();
 +
 +        List<CompletableFuture<?>> fireEventFutures = new ArrayList<>();
 +
 +        for (TableView tableView : tablesCfg.tables().value()) {
 +            fireEventFutures.add(fireEvent(TableEvent.CREATE, new 
TableEventParameters(causalityToken, tableView.id())));
 +        }
 +
 +        startVv.update(causalityToken, (unused, throwable) -> 
allOf(fireEventFutures.toArray(CompletableFuture[]::new)))
 +                .whenComplete((unused, throwable) -> {
 +                    if (throwable != null) {
 +                        LOG.error("Error when firing table creation events at 
manager start", throwable);
 +                    } else {
 +                        if (LOG.isDebugEnabled()) {
 +                            LOG.debug("Manager successfully fired table 
creation events at manager start");
 +                        }
 +                    }
 +                });
      }
  }
diff --cc 
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 31193447d9,e41f562587..92f92345af
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@@ -62,8 -64,8 +62,9 @@@ import java.util.function.LongFunction
  import org.apache.ignite.configuration.NamedListView;
  import org.apache.ignite.internal.affinity.AffinityUtils;
  import org.apache.ignite.internal.baseline.BaselineManager;
+ import org.apache.ignite.internal.catalog.CatalogService;
 +import org.apache.ignite.internal.catalog.CatalogManager;
 +import org.apache.ignite.internal.catalog.CatalogTestUtils;
  import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
  import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
  import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@@ -828,7 -833,8 +830,8 @@@ public class TableManagerTest extends I
                  vaultManager,
                  cmgMgr,
                  distributionZoneManager,
+                 mock(SchemaSyncService.class),
 -                mock(CatalogService.class)
 +                catalogManager
          ) {
  
              @Override
diff --cc 
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 8aadeae88c,d0c250122f..d0dd6dcb35
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@@ -17,8 -17,9 +17,9 @@@
  
  package org.apache.ignite.internal.table.distributed.replication;
  
+ import static java.util.concurrent.CompletableFuture.completedFuture;
  import static java.util.stream.Collectors.toList;
 -import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_PARTITION_COUNT;
 +import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
  import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
  import static org.hamcrest.MatcherAssert.assertThat;
  import static org.hamcrest.Matchers.allOf;

Reply via email to