This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26035 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit aeac92411701f371cd4d942bffb07a4939cfe439 Author: Kirill Tkalenko <[email protected]> AuthorDate: Wed Jul 30 08:51:53 2025 +0300 IGNITE-26035 wip --- .../internal/lowwatermark/LowWatermarkImpl.java | 91 +++++----------- modules/raft/build.gradle | 3 + .../raftsnapshot/ItParallelRaftSnapshotsTest.java | 121 ++++++++++++++++----- 3 files changed, 122 insertions(+), 93 deletions(-) diff --git a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java index 4119871b2d9..de8c264d6f9 100644 --- a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java +++ b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.lowwatermark; -import static org.apache.ignite.configuration.notifications.ConfigurationListener.fromConsumer; import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; @@ -37,9 +36,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import org.apache.ignite.internal.event.AbstractEventProducer; @@ -51,7 +48,6 @@ import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.lowwatermark.ScheduledUpdateLowWatermarkTask.State; import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters; import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent; import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEventParameters; @@ -128,11 +124,6 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L private final Map<UUID, LowWatermarkLock> locks = new ConcurrentHashMap<>(); - /** Guarded by {@link #scheduleUpdateLowWatermarkTaskLock}. */ - private @Nullable ScheduledUpdateLowWatermarkTask lastScheduledUpdateLowWatermarkTask; - - private final Lock scheduleUpdateLowWatermarkTaskLock = new ReentrantLock(); - /** * Constructor. * @@ -168,7 +159,12 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L messagingService.addMessageHandler(LowWatermarkMessageGroup.class, this::onReceiveNetworkMessage); - lowWatermarkConfig.updateIntervalMillis().listen(fromConsumer(ctx -> scheduleUpdates())); + // TODO: IGNITE-26035 убрать + lowWatermarkConfig.updateIntervalMillis().listen(ctx -> { + scheduleUpdates(); + + return nullCompletedFuture(); + }); return nullCompletedFuture(); }); @@ -223,57 +219,24 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L } private void scheduleUpdateLowWatermarkBusy() { - scheduleUpdateLowWatermarkTaskLock.lock(); - - try { - ScheduledUpdateLowWatermarkTask lastTask = lastScheduledUpdateLowWatermarkTask; - ScheduledUpdateLowWatermarkTask newTask = new ScheduledUpdateLowWatermarkTask(this, State.NEW); - - State lastTaskState = lastTask == null ? State.COMPLETED : lastTask.state(); + // TODO: IGNITE-26035 Убрать синхранизацию и прочий код + synchronized (lastScheduledTaskFuture) { + ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get(); - switch (lastTaskState) { - case NEW: - if (lastTask.tryCancel()) { - lastScheduledUpdateLowWatermarkTask = newTask; - - scheduleUpdateLowWatermarkTaskBusy(newTask); - } - - break; - case IN_PROGRESS: - // In this case we don't need to schedule a new task because the current task that is in progress will schedule a new - // task when it finishes. - break; - case COMPLETED: - lastScheduledUpdateLowWatermarkTask = newTask; - - scheduleUpdateLowWatermarkTaskBusy(newTask); - - break; - default: - throw new AssertionError("Unknown state: " + lastTaskState); + if (previousScheduledFuture != null && !previousScheduledFuture.isDone()) { + previousScheduledFuture.cancel(true); } - } finally { - scheduleUpdateLowWatermarkTaskLock.unlock(); - } - } - - private void scheduleUpdateLowWatermarkTaskBusy(ScheduledUpdateLowWatermarkTask task) { - ScheduledFuture<?> previousScheduledFuture = this.lastScheduledTaskFuture.get(); - - if (previousScheduledFuture != null && !previousScheduledFuture.isDone()) { - previousScheduledFuture.cancel(true); - } - ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule( - task, - lowWatermarkConfig.updateIntervalMillis().value(), - TimeUnit.MILLISECONDS - ); + ScheduledFuture<?> newScheduledFuture = scheduledThreadPool.schedule( + () -> updateLowWatermark(createNewLowWatermarkCandidate()), + lowWatermarkConfig.updateIntervalMillis().value(), + TimeUnit.MILLISECONDS + ); - boolean casResult = lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, newScheduledFuture); + boolean casResult = lastScheduledTaskFuture.compareAndSet(previousScheduledFuture, newScheduledFuture); - assert casResult : "only one scheduled task is expected"; + assert casResult : "only one scheduled task is expected"; + } } HybridTimestamp createNewLowWatermarkCandidate() { @@ -319,11 +282,7 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L @Override public void updateLowWatermark(HybridTimestamp newLowWatermark) { - updateLowWatermarkAsync(newLowWatermark); - } - - CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp newLowWatermark) { - return inBusyLockAsync(busyLock, () -> { + inBusyLock(busyLock, () -> { LowWatermarkCandidate newLowWatermarkCandidate = new LowWatermarkCandidate(newLowWatermark, new CompletableFuture<>()); LowWatermarkCandidate oldLowWatermarkCandidate; @@ -332,12 +291,12 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L // If another candidate contains a higher low watermark, then there is no need to update. if (oldLowWatermarkCandidate.lowWatermark().compareTo(newLowWatermark) >= 0) { - return nullCompletedFuture(); + return; } } while (!lowWatermarkCandidate.compareAndSet(oldLowWatermarkCandidate, newLowWatermarkCandidate)); // We will start the update as soon as the previous one finishes. - return oldLowWatermarkCandidate.updateFuture() + oldLowWatermarkCandidate.updateFuture() .thenComposeAsync(unused -> updateAndNotify(newLowWatermark), scheduledThreadPool) .whenComplete((unused, throwable) -> { if (throwable != null) { @@ -392,12 +351,16 @@ public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, L .whenCompleteAsync((unused, throwable) -> { if (throwable != null) { if (!(hasCause(throwable, NodeStoppingException.class))) { - LOG.error("Failed to update low watermark: {}", throwable, newLowWatermark); + LOG.error("Failed to update low watermark, will schedule again: {}", throwable, newLowWatermark); failureManager.process(new FailureContext(CRITICAL_ERROR, throwable)); + + inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); } } else { LOG.info("Successful low watermark update: {}", newLowWatermark); + + inBusyLock(busyLock, this::scheduleUpdateLowWatermarkBusy); } }, scheduledThreadPool); } diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle index 66346fcfe1c..fae0fc6fe22 100644 --- a/modules/raft/build.gradle +++ b/modules/raft/build.gradle @@ -106,6 +106,9 @@ dependencies { integrationTestImplementation project(':ignite-partition-replicator') integrationTestImplementation project(':ignite-replicator') integrationTestImplementation project(':ignite-sql-engine') + integrationTestImplementation project(':ignite-low-watermark') + integrationTestImplementation project(':ignite-schema') + integrationTestImplementation project(':ignite-configuration-root') integrationTestImplementation libs.awaitility integrationTestImplementation libs.dropwizard.metrics integrationTestImplementation libs.disruptor diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java index aac8fa586b6..65c4ab30987 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItParallelRaftSnapshotsTest.java @@ -22,6 +22,8 @@ import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; +import static org.apache.ignite.internal.event.EventListener.fromConsumer; +import static org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.awaitility.Awaitility.await; @@ -29,23 +31,31 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.function.IntPredicate; +import org.apache.ignite.Ignite; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.TestWrappers; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.Member; import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.table.Table; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest { - private static final String TEST_ZONE_NAME = "TEST_ZONE"; + private static final String ZONE_NAME = "TEST_ZONE"; - private static final String TEST_TABLE_NAME = "TEST_TABLE"; + private static final String TABLE_NAME = "TEST_TABLE"; @Override protected int initialNodes() { @@ -58,44 +68,40 @@ class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest { */ @Test void testSnapshotStreamingToMultipleNodes() { - String zoneSql = String.format( - "CREATE ZONE %s WITH STORAGE_PROFILES='%s', PARTITIONS=1, REPLICAS=5", - TEST_ZONE_NAME, DEFAULT_STORAGE_PROFILE - ); + createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, initialNodes()); - executeSql(zoneSql); - - String tableSql = String.format( - "CREATE TABLE %s (key INT PRIMARY KEY, val VARCHAR(20)) ZONE %s ", - TEST_TABLE_NAME, TEST_ZONE_NAME - ); - - executeSql(tableSql); - - ReplicationGroupId groupId = cluster.solePartitionId(TEST_ZONE_NAME, TEST_TABLE_NAME); + ReplicationGroupId groupId = cluster.solePartitionId(ZONE_NAME, TABLE_NAME); int primaryReplicaIndex = primaryReplicaIndex(groupId); // Stop two nodes that are neither primary replicas for the test table nor for the metastorage. - List<Integer> nodesToKill = cluster.aliveNodesWithIndices().stream() - .map(IgniteBiTuple::get1) - .filter(index -> index != primaryReplicaIndex && index != 0) - .limit(2) - .collect(toList()); + int[] nodesToKill = nodeToKillIndexes(nodeIndex -> nodeIndex != primaryReplicaIndex && nodeIndex != 0, 2); - nodesToKill.parallelStream().forEach(cluster::stopNode); + Arrays.stream(nodesToKill).parallel().forEach(cluster::stopNode); + + int tableSize = 100; // After the nodes have been stopped, insert some data and truncate the Raft log on primary replica. - executeSql(primaryReplicaIndex, String.format("INSERT INTO %s VALUES (1, 'one')", TEST_TABLE_NAME)); + insertIntoTable(primaryReplicaIndex, TABLE_NAME, tableSize); - validateTableData(primaryReplicaIndex); + validateTableData(primaryReplicaIndex, tableSize); truncateLog(primaryReplicaIndex, groupId); // Start the nodes in parallel, Raft leader is expected to start streaming snapshots on them. - nodesToKill.parallelStream().forEach(cluster::startNode); + Arrays.stream(nodesToKill).parallel().forEach(cluster::startNode); - nodesToKill.forEach(this::validateTableData); + Arrays.stream(nodesToKill).forEach(nodeIndex -> validateTableData(nodeIndex, tableSize)); + } + + @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-26034") + void testInstallRaftSnapshotAfterUpdateLowWatermark() { + updateLowWatermarkUpdateInterval(1_000); + + assertThat(awaitUpdateLowWatermarkAsync(cluster.nodes()), willCompleteSuccessfully()); + + testSnapshotStreamingToMultipleNodes(); } private int primaryReplicaIndex(ReplicationGroupId groupId) { @@ -124,8 +130,8 @@ class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest { assertThat(truncateFuture, willCompleteSuccessfully()); } - private void validateTableData(int nodeIndex) { - Table table = cluster.node(nodeIndex).tables().table(TEST_TABLE_NAME); + private void validateTableData(int nodeIndex, long expTableSize) { + Table table = cluster.node(nodeIndex).tables().table(TABLE_NAME); InternalTable internalTable = unwrapTableImpl(table).internalTable(); @@ -133,6 +139,63 @@ class ItParallelRaftSnapshotsTest extends ClusterPerTestIntegrationTest { assertThat(mvPartition, is(notNullValue())); - await().until(mvPartition::estimatedSize, is(1L)); + await().until(mvPartition::estimatedSize, is(expTableSize)); + } + + private void createZoneAndTable(String zoneName, String tableName, int partitions, int replicas) { + executeSql(String.format( + "CREATE ZONE %s WITH STORAGE_PROFILES='%s', PARTITIONS=%s, REPLICAS=%s", + zoneName, DEFAULT_STORAGE_PROFILE, partitions, replicas + )); + + executeSql(String.format( + "CREATE TABLE %s (key INT PRIMARY KEY, val VARCHAR(256)) ZONE %s ", + tableName, zoneName + )); + } + + private int[] nodeToKillIndexes(IntPredicate indexFilter, int limit) { + return cluster.aliveNodesWithIndices().stream() + .mapToInt(IgniteBiTuple::get1) + .filter(Objects::nonNull) + .filter(indexFilter) + .limit(limit) + .toArray(); + } + + private void insertIntoTable(int nodeIndex, String tableName, int count) { + String insertDml = String.format("INSERT INTO %s (key, val) VALUES (?, ?)", tableName); + + for (int i = 0; i < count; i++) { + executeSql(nodeIndex, insertDml, i, "n_" + count); + } + } + + private void updateLowWatermarkUpdateInterval(long intervalMillis) { + IgniteImpl node = unwrapIgniteImpl(cluster.aliveNode()); + + CompletableFuture<Void> updateConfigFuture = node.clusterConfiguration().getConfiguration(GcExtensionConfiguration.KEY) + .gc() + .lowWatermark() + .updateIntervalMillis() + .update(intervalMillis); + + assertThat(updateConfigFuture, willCompleteSuccessfully()); + } + + private static CompletableFuture<Void> awaitUpdateLowWatermarkAsync(List<Ignite> nodes) { + List<CompletableFuture<Void>> futures = nodes.stream() + .map(TestWrappers::unwrapIgniteImpl) + .map(IgniteImpl::lowWatermark) + .map(lowWatermark -> { + var future = new CompletableFuture<Void>(); + + lowWatermark.listen(LOW_WATERMARK_CHANGED, fromConsumer(p -> future.complete(null))); + + return future; + }) + .collect(toList()); + + return CompletableFutures.allOf(futures); } }
