This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-19499 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit f3f59234ce743dce85dcc59ba9f1cf5faec8609a Author: amashenkov <andrey.mashen...@gmail.com> AuthorDate: Thu Aug 24 17:02:43 2023 +0300 Plug SchemaSyncService in. --- .../internal/catalog/CatalogManagerImpl.java | 11 +++----- .../internal/catalog/CatalogManagerSelfTest.java | 4 +-- .../internal/catalog/BaseCatalogManagerTest.java | 2 +- .../ignite/internal/catalog/CatalogTestUtils.java | 4 +-- .../testframework/TestIgnitionManager.java | 29 +++++++++++++++------- .../ignite/internal/index/IndexManagerTest.java | 2 +- .../storage/ItRebalanceDistributedTest.java | 18 +++++++++++--- ...niteDistributionZoneManagerNodeRestartTest.java | 2 +- .../runner/app/ItIgniteNodeRestartTest.java | 17 +++++++++++-- .../org/apache/ignite/internal/app/IgniteImpl.java | 10 +++++++- .../internal/table/distributed/TableManager.java | 7 +++++- .../table/distributed/TableManagerTest.java | 4 ++- 12 files changed, 78 insertions(+), 32 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index f5d6c82f7a..2902947fb7 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -112,6 +112,7 @@ import org.apache.ignite.lang.TableNotFoundException; import org.apache.ignite.sql.ColumnType; import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Catalog service implementation. @@ -142,14 +143,8 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam /** * Constructor. */ - public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter) { - this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION); - } - - /** - * Constructor. - */ - CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs) { + @TestOnly + public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs) { this(updateLog, clockWaiter, () -> delayDurationMs); } diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java index 6f1a9408dc..6f892a2e4e 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java @@ -1037,7 +1037,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture()); - CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockWaiter); + CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockWaiter, 0L); manager.start(); when(updateLogMock.append(any())).thenAnswer(invocation -> { @@ -1108,7 +1108,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { public void catalogServiceManagesUpdateLogLifecycle() throws Exception { UpdateLog updateLogMock = mock(UpdateLog.class); - CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockWaiter); + CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, clockWaiter, 0L); manager.start(); diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java index c138c767e9..f2a3b24748 100644 --- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java +++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java @@ -74,7 +74,7 @@ public abstract class BaseCatalogManagerTest extends BaseIgniteAbstractTest { updateLog = spy(new UpdateLogImpl(metastore)); clockWaiter = spy(new ClockWaiter(NODE_NAME, clock)); - manager = new CatalogManagerImpl(updateLog, clockWaiter); + manager = new CatalogManagerImpl(updateLog, clockWaiter, 0L); vault.start(); metastore.start(); diff --git a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java index 03b15ac741..d313ca9f5d 100644 --- a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java +++ b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java @@ -47,7 +47,7 @@ public class CatalogTestUtils { var clockWaiter = new ClockWaiter(nodeName, clock); - return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter) { + return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter, 0L) { @Override public void start() { vault.start(); @@ -91,7 +91,7 @@ public class CatalogTestUtils { public static CatalogManager createTestCatalogManager(String nodeName, HybridClock clock, MetaStorageManager metastore) { var clockWaiter = new ClockWaiter(nodeName, clock); - return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter) { + return new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter, 0L) { @Override public void start() { clockWaiter.start(); diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java index dc4d66f463..cbab5d1930 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java @@ -37,6 +37,9 @@ public class TestIgnitionManager { /** Default name of configuration file. */ public static final String DEFAULT_CONFIG_NAME = "ignite-config.conf"; + private static final int DEFAULT_DELAY_DURATION_MS = 100; + private static final int DEFAULT_METASTORAGE_IDLE_SYNC_TIME_INTERVAL_MS = 10; + /** * Starts an Ignite node with an optional bootstrap configuration from an input stream with HOCON configs. * @@ -100,20 +103,28 @@ public class TestIgnitionManager { .metaStorageNodeNames(params.metaStorageNodeNames()) .cmgNodeNames(params.cmgNodeNames()); + ConfigDocument configDocument; + if (params.clusterConfiguration() == null) { - builder.clusterConfiguration("{ schemaSync.delayDuration: 0 }"); + configDocument = ConfigDocumentFactory.parseString("{}"); } else { - ConfigDocument configDocument = ConfigDocumentFactory.parseString(params.clusterConfiguration()); - - String delayDurationPath = "schemaSync.delayDuration"; + configDocument = ConfigDocumentFactory.parseString(params.clusterConfiguration()); + } - if (!configDocument.hasPath(delayDurationPath)) { - ConfigDocument updatedDocument = configDocument.withValueText(delayDurationPath, "0"); + configDocument = amendSetting(configDocument, "schemaSync.delayDuration", Integer.toString(DEFAULT_DELAY_DURATION_MS)); + configDocument = amendSetting(configDocument, "metaStorage.idleSyncTimeInterval", + Integer.toString(DEFAULT_METASTORAGE_IDLE_SYNC_TIME_INTERVAL_MS)); - builder.clusterConfiguration(updatedDocument.render()); - } - } + builder.clusterConfiguration(configDocument.render()); return builder.build(); } + + private static ConfigDocument amendSetting(ConfigDocument document, String path, String value) { + if (document.hasPath(path)) { + return document; + } else { + return document.withValueText(path, value); + } + } } diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java index 2ee9b80508..f7b01bd046 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java @@ -137,7 +137,7 @@ public class IndexManagerTest extends BaseIgniteAbstractTest { clockWaiter = new ClockWaiter(nodeName, clock); - catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageManager), clockWaiter); + catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageManager), clockWaiter, 0L); indexManager = new IndexManager( tablesConfig, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index d04c197ff8..51a7487e70 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@ -65,6 +65,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.LongFunction; +import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.stream.IntStream; import org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration; @@ -73,6 +74,7 @@ import org.apache.ignite.internal.baseline.BaselineManager; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.catalog.ClockWaiter; +import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; @@ -148,6 +150,7 @@ import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -772,10 +775,18 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { clockWaiter = new ClockWaiter(name, hybridClock); + SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration( + SchemaSynchronizationConfiguration.KEY + ); + + LongSupplier delayDuration = () -> schemaSyncConfig.delayDuration().value(); + catalogManager = new CatalogManagerImpl( new UpdateLogImpl(metaStorageManager), - clockWaiter - ); + clockWaiter, + delayDuration); + + var schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), catalogManager, delayDuration); schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager); @@ -815,7 +826,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { vaultManager, cmgManager, distributionZoneManager, - catalogManager + catalogManager, + schemaSyncService ) { @Override protected TxStateTableStorage createTxStateTableStorage( diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java index 029946c3ba..393fec38ed 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java @@ -219,7 +219,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe var clockWaiter = new ClockWaiter(name, clock); - var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter); + var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metastore), clockWaiter, 0L); DistributionZoneManager distributionZoneManager = new DistributionZoneManager( name, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 045684c66a..2371f7c57f 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.google.common.base.Supplier; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.function.LongFunction; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -60,6 +62,7 @@ import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.baseline.BaselineManager; import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.catalog.ClockWaiter; +import org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration; import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration; @@ -112,6 +115,7 @@ import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.testframework.TestIgnitionManager; import org.apache.ignite.internal.tx.impl.HeapLockManager; @@ -342,7 +346,15 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { var clockWaiter = new ClockWaiter(name, clock); - var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageMgr), clockWaiter); + SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration( + SchemaSynchronizationConfiguration.KEY + ); + + LongSupplier delayDuration = () -> schemaSyncConfig.delayDuration().value(); + + var catalogManager = new CatalogManagerImpl(new UpdateLogImpl(metaStorageMgr), clockWaiter, delayDuration); + + var schemaSyncService = new SchemaSyncServiceImpl(metaStorageMgr.clusterTime(), catalogManager, delayDuration); DistributionZoneManager distributionZoneManager = new DistributionZoneManager( name, @@ -380,7 +392,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { vault, null, null, - catalogManager + catalogManager, + schemaSyncService ); var indexManager = new IndexManager(tablesConfig, schemaManager, tableManager, catalogManager, metaStorageMgr, registry); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index fd1a505485..e4a5924a1f 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -128,6 +128,7 @@ import org.apache.ignite.internal.storage.DataStorageModules; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; @@ -504,6 +505,12 @@ public class IgniteImpl implements Ignite { () -> schemaSyncConfig.delayDuration().value() ); + SchemaSyncServiceImpl schemaSyncService = new SchemaSyncServiceImpl( + metaStorageManager().clusterTime(), + catalogManager, + () -> schemaSyncConfig.delayDuration().value() + ); + distributionZoneManager = new DistributionZoneManager( name, registry, @@ -544,7 +551,8 @@ public class IgniteImpl implements Ignite { vaultMgr, cmgMgr, distributionZoneManager, - catalogManager + catalogManager, + schemaSyncService ); indexManager = new IndexManager(tablesConfig, schemaManager, distributedTblMgr, catalogManager, metaStorageMgr, registry); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 9cd4ab5a62..87ce508876 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -167,6 +167,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Snaps import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver; import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas; +import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.distributed.storage.PartitionStorages; import org.apache.ignite.internal.table.event.TableEvent; @@ -373,6 +374,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp private final ConfiguredTablesCache configuredTablesCache; + private final SchemaSyncService schemaSyncService; + private final CatalogManager catalogManager; /** Versioned value used only at manager startup to correctly fire table creation events. */ @@ -425,7 +428,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp VaultManager vaultManager, ClusterManagementGroupManager cmgMgr, DistributionZoneManager distributionZoneManager, - CatalogManager catalogManager + CatalogManager catalogManager, + SchemaSyncService schemaSyncService ) { this.tablesCfg = tablesCfg; this.zonesConfig = zonesConfig; @@ -449,6 +453,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp this.cmgMgr = cmgMgr; this.distributionZoneManager = distributionZoneManager; this.catalogManager = catalogManager; + this.schemaSyncService = schemaSyncService; clusterNodeResolver = topologyService::getByConsistentId; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index e97cc73dcc..8474bf299c 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -112,6 +112,7 @@ import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; import org.apache.ignite.internal.table.event.TableEvent; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.TxManager; @@ -836,7 +837,8 @@ public class TableManagerTest extends IgniteAbstractTest { vaultManager, cmgMgr, distributionZoneManager, - mock(CatalogManager.class) + mock(CatalogManager.class), + mock(SchemaSyncService.class) ) { @Override