This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 0a59af8a10 IGNITE-20668 Increase wait after a DDL to account for idle safe-time propagation period (#2703) 0a59af8a10 is described below commit 0a59af8a100e1e8e05e541c701aaf8da3ab818f2 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Wed Oct 18 13:17:24 2023 +0400 IGNITE-20668 Increase wait after a DDL to account for idle safe-time propagation period (#2703) --- .../internal/catalog/CatalogManagerImpl.java | 24 ++++-- .../internal/catalog/CatalogManagerSelfTest.java | 4 +- .../java/org/apache/ignite/internal/Hacks.java | 27 +++++++ .../testframework/BaseIgniteAbstractTest.java | 2 + .../ignite/internal/replicator/ReplicaManager.java | 56 +++++++++++++- .../apache/ignite/internal/jdbc/ItJdbcTest.java | 2 +- .../internal/readonly/ItReadOnlyTxInPastTest.java | 86 ++++++++++++++++++++++ .../rebalance/ItRebalanceDistributedTest.java | 11 ++- .../rebalance/ItRebalanceRecoveryTest.java | 3 + .../runner/app/ItIgniteNodeRestartTest.java | 11 ++- .../ignite/internal/sql/api/ItSqlApiBaseTest.java | 9 ++- .../org/apache/ignite/internal/app/IgniteImpl.java | 16 +++- .../java/org/apache/ignite/internal/Cluster.java | 10 +++ .../LowWatermarkConfigurationSchema.java | 4 +- .../ignite/internal/tx/impl/TxManagerImpl.java | 49 ++++++++++-- .../apache/ignite/internal/tx/TxManagerTest.java | 21 +++--- 16 files changed, 291 insertions(+), 44 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index aab4f0f777..9a6ead5252 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -90,6 +90,8 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata /** Safe time to wait before new Catalog version activation. */ private static final int DEFAULT_DELAY_DURATION = 0; + private static final int DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD = 0; + /** Initial update token for a catalog descriptor, this token is valid only before the first call of * {@link UpdateEntry#applyUpdate(Catalog, long)}. * @@ -115,27 +117,35 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata private final LongSupplier delayDurationMsSupplier; + private final LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier; + /** * Constructor. */ public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter) { - this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION); + this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION, DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD); } /** * Constructor. */ - CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs) { - this(updateLog, clockWaiter, () -> delayDurationMs); + CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long delayDurationMs, long partitionIdleSafeTimePropagationPeriod) { + this(updateLog, clockWaiter, () -> delayDurationMs, () -> partitionIdleSafeTimePropagationPeriod); } /** * Constructor. */ - public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, LongSupplier delayDurationMsSupplier) { + public CatalogManagerImpl( + UpdateLog updateLog, + ClockWaiter clockWaiter, + LongSupplier delayDurationMsSupplier, + LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier + ) { this.updateLog = updateLog; this.clockWaiter = clockWaiter; this.delayDurationMsSupplier = delayDurationMsSupplier; + this.partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier; } @Override @@ -415,8 +425,12 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata HybridTimestamp clusterWideEnsuredActivationTs = activationTs.addPhysicalTime( HybridTimestamp.maxClockSkew() ); + // TODO: this addition has to be removed when IGNITE-20378 is implemented. + HybridTimestamp tsSafeForRoReadingInPastOptimization = clusterWideEnsuredActivationTs.addPhysicalTime( + partitionIdleSafeTimePropagationPeriodMsSupplier.getAsLong() + ); - return clockWaiter.waitFor(clusterWideEnsuredActivationTs); + return clockWaiter.waitFor(tsSafeForRoReadingInPastOptimization); }); } diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java index 01f78d5125..fbaae7dbfb 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java @@ -1046,7 +1046,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { public void catalogActivationTime() throws Exception { long delayDuration = TimeUnit.DAYS.toMillis(365); - CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, clockWaiter, delayDuration); + CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, clockWaiter, delayDuration, 0); manager.start(); @@ -1444,7 +1444,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { HybridTimestamp startTs = clock.now(); - CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, clockWaiter, delayDuration); + CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, clockWaiter, delayDuration, 0); manager.start(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java b/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java new file mode 100644 index 0000000000..b9cbdc1498 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +/** + * Contains hacks needed for the whole codebase. Should be removed as quickly as possible. + */ +public class Hacks { + // TODO: Remove after IGNITE-20499 is fixed. + /** Name of the property overriding idle safe time propagation period (in milliseconds). */ + public static final String IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY = "IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS"; +} diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java index 8d3338cc9a..c811c4d11d 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.testframework; +import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.lang.IgniteSystemProperties.IGNITE_SENSITIVE_DATA_LOGGING; import static org.apache.ignite.internal.lang.IgniteSystemProperties.getString; import static org.apache.ignite.internal.util.IgniteUtils.monotonicMs; @@ -38,6 +39,7 @@ import org.mockito.Mockito; * Ignite base test class. */ @ExtendWith(SystemPropertiesExtension.class) +@WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "200") public abstract class BaseIgniteAbstractTest { /** Logger. */ protected final IgniteLogger log = Loggers.forClass(getClass()); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index 55b03f36e7..eb525e7f1a 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.replicator; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import java.io.IOException; @@ -36,6 +37,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -80,8 +82,8 @@ import org.jetbrains.annotations.TestOnly; * This class allow to start/stop/get a replica. */ public class ReplicaManager implements IgniteComponent { - /** Idle safe time propagation period. */ - public static final int IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS = 1000; + /** Default Idle safe time propagation period for tests. */ + public static final int DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS = 1000; /** The logger. */ private static final IgniteLogger LOG = Loggers.forClass(ReplicaManager.class); @@ -115,6 +117,8 @@ public class ReplicaManager implements IgniteComponent { /** Placement driver. */ private final PlacementDriver placementDriver; + private final LongSupplier idleSafeTimePropagationPeriodMsSupplier; + /** Replicas. */ private final ConcurrentHashMap<ReplicationGroupId, CompletableFuture<Replica>> replicas = new ConcurrentHashMap<>(); @@ -141,7 +145,9 @@ public class ReplicaManager implements IgniteComponent { * @param cmgMgr Cluster group manager. * @param clock A hybrid logical clock. * @param messageGroupsToHandle Message handlers. + * @param placementDriver A placement driver. */ + @TestOnly public ReplicaManager( String nodeName, ClusterService clusterNetSvc, @@ -149,6 +155,37 @@ public class ReplicaManager implements IgniteComponent { HybridClock clock, Set<Class<?>> messageGroupsToHandle, PlacementDriver placementDriver + ) { + this( + nodeName, + clusterNetSvc, + cmgMgr, + clock, + messageGroupsToHandle, + placementDriver, + () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + ); + } + + /** + * Constructor for a replica service. + * + * @param nodeName Node name. + * @param clusterNetSvc Cluster network service. + * @param cmgMgr Cluster group manager. + * @param clock A hybrid logical clock. + * @param messageGroupsToHandle Message handlers. + * @param placementDriver A placement driver. + * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms. + */ + public ReplicaManager( + String nodeName, + ClusterService clusterNetSvc, + ClusterManagementGroupManager cmgMgr, + HybridClock clock, + Set<Class<?>> messageGroupsToHandle, + PlacementDriver placementDriver, + LongSupplier idleSafeTimePropagationPeriodMsSupplier ) { this.clusterNetSvc = clusterNetSvc; this.cmgMgr = cmgMgr; @@ -157,6 +194,7 @@ public class ReplicaManager implements IgniteComponent { this.handler = this::onReplicaMessageReceived; this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived; this.placementDriver = placementDriver; + this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool( 1, @@ -506,7 +544,7 @@ public class ReplicaManager implements IgniteComponent { scheduledIdleSafeTimeSyncExecutor.scheduleAtFixedRate( this::idleSafeTimeSync, 0, - IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS, + idleSafeTimePropagationPeriodMsSupplier.getAsLong(), TimeUnit.MILLISECONDS ); @@ -675,4 +713,16 @@ public class ReplicaManager implements IgniteComponent { public Set<ReplicationGroupId> startedGroups() { return replicas.keySet(); } + + /** + * TODO: to be removed after IGNITE-20499 is fixed. This was introduced in a rush because of a burning release, should be fixe asap. + */ + public static long idleSafeTimePropagationPeriodMs() { + return Long.parseLong( + System.getProperty( + IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, + Integer.toString(DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS) + ) + ); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java index 4bc248f033..e4118682a3 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/jdbc/ItJdbcTest.java @@ -69,7 +69,7 @@ class ItJdbcTest extends IgniteIntegrationTest { @BeforeAll void setUp(TestInfo testInfo, @WorkDirectory Path workDir) { cluster = new Cluster(testInfo, workDir); - cluster.startAndInit(1, new int[]{ 0 }, builder -> builder.clusterConfiguration( + cluster.startAndInit(1, builder -> builder.clusterConfiguration( "{\n" + " \"security\": {\n" + " \"enabled\": true,\n" diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java new file mode 100644 index 0000000000..4b1d651127 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.readonly; + +import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; +import static org.apache.ignite.internal.SessionUtils.executeUpdate; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.testframework.WithSystemProperty; +import org.apache.ignite.sql.ResultSet; +import org.apache.ignite.sql.SqlRow; +import org.apache.ignite.tx.TransactionOptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests about read-only transactions in the past. + */ +@SuppressWarnings("resource") +// Setting this to 1 second so that an RO tx has a potential to look before a table was created. +@WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "1000") +class ItReadOnlyTxInPastTest extends ClusterPerTestIntegrationTest { + private static final String TABLE_NAME = "test"; + + @Override + protected int initialNodes() { + return 0; + } + + @BeforeEach + void prepareCluster() { + cluster.startAndInit(1); + + cluster.doInSession(0, session -> { + executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session); + }); + } + + /** + * Make sure that an explicit RO transaction does not look too far in the past (where the corresponding + * table did not yet exist) even when the 'look in the past' optimization is enabled. + */ + @Test + void explicitReadOnlyTxDoesNotLookBeforeTableCreation() { + IgniteImpl node = cluster.node(0); + + long count = node.transactions().runInTransaction(tx -> { + return cluster.doInSession(0, session -> { + try (ResultSet<SqlRow> resultSet = session.execute(tx, "SELECT COUNT(*) FROM " + TABLE_NAME)) { + return resultSet.next().longValue(0); + } + }); + }, new TransactionOptions().readOnly(true)); + + assertThat(count, is(0L)); + } + + /** + * Make sure that an implicit RO transaction does not look too far in the past (where the corresponding + * table did not yet exist) even when the 'look in the past' optimization is enabled. + */ + @Test + void implicitReadOnlyTxDoesNotLookBeforeTableCreation() { + long count = cluster.query(0, "SELECT COUNT(*) FROM " + TABLE_NAME, rs -> rs.next().longValue(0)); + + assertThat(count, is(0L)); + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 488a218233..c21f4d7bd2 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -821,13 +821,16 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { new NodeAttributesCollector(nodeAttributes), new TestConfigurationValidator()); + LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = () -> 10L; + replicaManager = spy(new ReplicaManager( name, clusterService, cmgManager, hybridClock, Set.of(TableMessageGroup.class, TxMessageGroup.class), - placementDriver + placementDriver, + partitionIdleSafeTimePropagationPeriodMsSupplier )); ReplicaService replicaSvc = new ReplicaService( @@ -841,7 +844,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { hybridClock, new TransactionIdGenerator(addr.port()), () -> clusterService.topologyService().localMember().id(), - placementDriver + placementDriver, + partitionIdleSafeTimePropagationPeriodMsSupplier ); String nodeName = clusterService.nodeName(); @@ -915,7 +919,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { catalogManager = new CatalogManagerImpl( new UpdateLogImpl(metaStorageManager), clockWaiter, - delayDurationMsSupplier + delayDurationMsSupplier, + partitionIdleSafeTimePropagationPeriodMsSupplier ); schemaManager = new CatalogSchemaManager(registry, catalogManager, metaStorageManager); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java index f1af298c4b..20b290dc6a 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.rebalance; +import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -26,11 +27,13 @@ import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.test.WatchListenerInhibitor; +import org.apache.ignite.internal.testframework.WithSystemProperty; import org.junit.jupiter.api.Test; /** * Tests for recovery of the rebalance procedure. */ +@WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "1000") public class ItRebalanceRecoveryTest extends ClusterPerTestIntegrationTest { @Override protected int initialNodes() { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 946067cb40..4b4bf37a36 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -264,13 +264,16 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { new NodeAttributesCollector(nodeAttributes), new TestConfigurationValidator()); + LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = () -> 10L; + ReplicaManager replicaMgr = new ReplicaManager( name, clusterSvc, cmgManager, hybridClock, Set.of(TableMessageGroup.class, TxMessageGroup.class), - placementDriver + placementDriver, + partitionIdleSafeTimePropagationPeriodMsSupplier ); var replicaService = new ReplicaService(clusterSvc.messagingService(), hybridClock); @@ -285,7 +288,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { hybridClock, new TransactionIdGenerator(idx), () -> clusterSvc.topologyService().localMember().id(), - placementDriver + placementDriver, + partitionIdleSafeTimePropagationPeriodMsSupplier ); var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); @@ -351,7 +355,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { var catalogManager = new CatalogManagerImpl( new UpdateLogImpl(metaStorageMgr), clockWaiter, - delayDurationMsSupplier + delayDurationMsSupplier, + partitionIdleSafeTimePropagationPeriodMsSupplier ); CatalogSchemaManager schemaManager = new CatalogSchemaManager(registry, catalogManager, metaStorageMgr); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java index 2fabd673ee..c6e9852eeb 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java @@ -68,6 +68,7 @@ import org.apache.ignite.table.Table; import org.apache.ignite.tx.Transaction; import org.apache.ignite.tx.TransactionOptions; import org.hamcrest.Matcher; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -368,7 +369,7 @@ public abstract class ItSqlApiBaseTest extends ClusterPerClassIntegrationTest { Session ses = sql.createSession(); for (int i = 0; i < ROW_COUNT; ++i) { - sql("INSERT INTO TEST VALUES (?, ?)", i, i); + executeForRead(ses, "INSERT INTO TEST VALUES (?, ?)", i, i); } List<Boolean> booleanList = List.of(Boolean.TRUE, Boolean.FALSE); @@ -834,11 +835,11 @@ public abstract class ItSqlApiBaseTest extends ClusterPerClassIntegrationTest { assertEquals(0, txManager().pending(), "Expected no pending transactions"); } - protected ResultSet<SqlRow> executeForRead(Session ses, String query) { - return executeForRead(ses, null, query); + protected ResultSet<SqlRow> executeForRead(Session ses, String query, Object... args) { + return executeForRead(ses, null, query, args); } - protected abstract ResultSet<SqlRow> executeForRead(Session ses, Transaction tx, String query, Object... args); + protected abstract ResultSet<SqlRow> executeForRead(Session ses, @Nullable Transaction tx, String query, Object... args); protected <T extends IgniteException> T checkError(Class<T> expCls, Integer code, String msg, Session ses, String sql, Object... args) { diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 5cf0ace69a..bcfc1cf764 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -467,13 +467,16 @@ public class IgniteImpl implements Ignite { clock ); + LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(); + replicaMgr = new ReplicaManager( name, clusterSvc, cmgMgr, clock, Set.of(TableMessageGroup.class, TxMessageGroup.class), - placementDriverMgr.placementDriver() + placementDriverMgr.placementDriver(), + partitionIdleSafeTimePropagationPeriodMsSupplier ); metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY)); @@ -511,7 +514,8 @@ public class IgniteImpl implements Ignite { CatalogManagerImpl catalogManager = new CatalogManagerImpl( new UpdateLogImpl(metaStorageMgr), clockWaiter, - delayDurationMsSupplier + delayDurationMsSupplier, + partitionIdleSafeTimePropagationPeriodMsSupplier ); systemViewManager = new SystemViewManagerImpl(name, catalogManager); @@ -544,7 +548,8 @@ public class IgniteImpl implements Ignite { clock, new TransactionIdGenerator(() -> clusterSvc.nodeName().hashCode()), () -> clusterSvc.topologyService().localMember().id(), - placementDriverMgr.placementDriver() + placementDriverMgr.placementDriver(), + partitionIdleSafeTimePropagationPeriodMsSupplier ); distributedTblMgr = new TableManager( @@ -652,6 +657,11 @@ public class IgniteImpl implements Ignite { restComponent = createRestComponent(name); } + private static LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier() { + // TODO: Replace with an immutable dynamic property set on cluster init after IGNITE-20499 is fixed. + return ReplicaManager::idleSafeTimePropagationPeriodMs; + } + private AuthenticationManager createAuthenticationManager() { SecurityConfiguration securityConfiguration = clusterCfgMgr.configurationRegistry() .getConfiguration(SecurityConfiguration.KEY); diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java index 632c1388d6..bd98e922c7 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java @@ -153,6 +153,16 @@ public class Cluster { startAndInit(nodeCount, cmgNodes, builder -> {}); } + /** + * Starts the cluster with the given number of nodes and initializes it. + * + * @param nodeCount Number of nodes in the cluster. + * @param initParametersConfigurator Configure {@link InitParameters} before initializing the cluster. + */ + public void startAndInit(int nodeCount, Consumer<InitParametersBuilder> initParametersConfigurator) { + startAndInit(nodeCount, new int[]{0}, initParametersConfigurator); + } + /** * Starts the cluster with the given number of nodes and initializes it. * diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java index 51d9e5ed8b..170bbde606 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/LowWatermarkConfigurationSchema.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.schema.configuration; import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; -import static org.apache.ignite.internal.replicator.ReplicaManager.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; +import static org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; import java.util.concurrent.TimeUnit; import org.apache.ignite.configuration.annotation.Config; @@ -39,7 +39,7 @@ public class LowWatermarkConfigurationSchema { * {@code now() - dataAvailabilityTime()}. */ // TODO https://issues.apache.org/jira/browse/IGNITE-18977 Make these values configurable and create dynamic validator after that. - @Range(min = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW) + @Range(min = DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW) @Value(hasDefault = true) public long dataAvailabilityTime = TimeUnit.MINUTES.toMillis(45); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index b3b6fc475f..cf5c315a35 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.runAsync; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; +import static org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; import static org.apache.ignite.internal.tx.TxState.ABORTED; import static org.apache.ignite.internal.tx.TxState.COMMITED; import static org.apache.ignite.internal.tx.TxState.PENDING; @@ -47,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -130,6 +132,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { private final PlacementDriver placementDriver; + private final LongSupplier idleSafeTimePropagationPeriodMsSupplier; + /** * The constructor. * @@ -145,6 +149,35 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { TransactionIdGenerator transactionIdGenerator, Supplier<String> localNodeIdSupplier, PlacementDriver placementDriver + ) { + this( + replicaService, + lockManager, + clock, + transactionIdGenerator, + localNodeIdSupplier, + placementDriver, + () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + ); + } + + /** + * The constructor. + * + * @param replicaService Replica service. + * @param lockManager Lock manager. + * @param clock A hybrid logical clock. + * @param transactionIdGenerator Used to generate transaction IDs. + * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms. + */ + public TxManagerImpl( + ReplicaService replicaService, + LockManager lockManager, + HybridClock clock, + TransactionIdGenerator transactionIdGenerator, + Supplier<String> localNodeIdSupplier, + PlacementDriver placementDriver, + LongSupplier idleSafeTimePropagationPeriodMsSupplier ) { this.replicaService = replicaService; this.lockManager = lockManager; @@ -152,6 +185,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { this.transactionIdGenerator = transactionIdGenerator; this.localNodeId = new Lazy<>(localNodeIdSupplier); this.placementDriver = placementDriver; + this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; int cpus = Runtime.getRuntime().availableProcessors(); @@ -216,14 +250,13 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler { * @return Current read timestamp. */ private HybridTimestamp currentReadTimestamp() { - return clock.now(); - - // TODO: IGNITE-20378 Fix it - // return new HybridTimestamp(now.getPhysical() - // - ReplicaManager.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS - // - HybridTimestamp.CLOCK_SKEW, - // 0 - // ); + HybridTimestamp now = clock.now(); + + return new HybridTimestamp(now.getPhysical() + - idleSafeTimePropagationPeriodMsSupplier.getAsLong() + - HybridTimestamp.CLOCK_SKEW, + 0 + ); } @Override diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 86f29a7aa1..359e9da571 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -19,9 +19,10 @@ package org.apache.ignite.internal.tx; import static java.lang.Math.abs; +import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY; import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; -import static org.apache.ignite.internal.replicator.ReplicaManager.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; +import static org.apache.ignite.internal.replicator.ReplicaManager.idleSafeTimePropagationPeriodMs; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR; @@ -51,6 +52,7 @@ import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException; import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; @@ -64,7 +66,6 @@ import org.apache.ignite.tx.TransactionException; import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -270,11 +271,11 @@ public class TxManagerTest extends IgniteAbstractTest { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20378") + @WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "1000") public void testObservableTimestamp() { long compareThreshold = 50; // Check that idle safe time propagation period is significantly greater than compareThreshold. - assertTrue(IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW > compareThreshold * 5); + assertTrue(idleSafeTimePropagationPeriodMs() + CLOCK_SKEW > compareThreshold * 5); HybridTimestamp now = clock.now(); @@ -291,7 +292,7 @@ public class TxManagerTest extends IgniteAbstractTest { tx.commit(); HybridTimestamp timestampInPast = new HybridTimestamp( - now.getPhysical() - IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS * 2, + now.getPhysical() - idleSafeTimePropagationPeriodMs() * 2, now.getLogical() ); @@ -301,7 +302,7 @@ public class TxManagerTest extends IgniteAbstractTest { tx = txManager.begin(hybridTimestampTracker, true); - long readTime = now.getPhysical() - IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS - CLOCK_SKEW; + long readTime = now.getPhysical() - idleSafeTimePropagationPeriodMs() - CLOCK_SKEW; assertThat(abs(readTime - tx.readTimestamp().getPhysical()), Matchers.lessThan(compareThreshold)); @@ -309,11 +310,11 @@ public class TxManagerTest extends IgniteAbstractTest { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20378") + @WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "1000") public void testObservableTimestampLocally() { long compareThreshold = 50; // Check that idle safe time propagation period is significantly greater than compareThreshold. - assertTrue(IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW > compareThreshold * 5); + assertTrue(idleSafeTimePropagationPeriodMs() + CLOCK_SKEW > compareThreshold * 5); HybridTimestamp now = clock.now(); @@ -324,7 +325,7 @@ public class TxManagerTest extends IgniteAbstractTest { assertTrue(firstReadTs.compareTo(now) < 0); assertTrue(now.getPhysical() - firstReadTs.getPhysical() < compareThreshold - + IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW); + + idleSafeTimePropagationPeriodMs() + CLOCK_SKEW); tx.commit(); tx = txManager.begin(hybridTimestampTracker, true); @@ -332,7 +333,7 @@ public class TxManagerTest extends IgniteAbstractTest { assertTrue(firstReadTs.compareTo(tx.readTimestamp()) <= 0); assertTrue(abs(now.getPhysical() - tx.readTimestamp().getPhysical()) < compareThreshold - + IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + CLOCK_SKEW); + + idleSafeTimePropagationPeriodMs() + CLOCK_SKEW); tx.commit(); }