This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d6d7b428781 IGNITE-28076 Use watch executor in 
SchemaSafeTimeTrackerImpl (#7729)
d6d7b428781 is described below

commit d6d7b4287811d28c1a4d81829efda7446cb6e99e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Mar 9 16:37:22 2026 +0400

    IGNITE-28076 Use watch executor in SchemaSafeTimeTrackerImpl (#7729)
---
 .../rebalance/ItRebalanceDistributedTest.java      |  2 +-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  7 ++++-
 .../metastorage/impl/WatchProcessorAccess.java     | 30 ++++++++++++++++++++++
 .../server/AbstractKeyValueStorage.java            |  6 +++++
 .../metastorage/server/KeyValueStorage.java        |  6 +++++
 .../metastorage/server/WatchProcessor.java         |  5 ++++
 .../partition/replicator/fixtures/Node.java        |  2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  2 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  2 +-
 .../internal/schema/SchemaSafeTimeTrackerImpl.java | 12 ++++++---
 .../schema/SchemaSafeTimeTrackerImplTest.java      |  9 ++++++-
 11 files changed, 73 insertions(+), 10 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a2f4a1d4537..fc0e4728fa8 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1505,7 +1505,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             schemaManager = new SchemaManager(registry, catalogManager);
 
-            schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
+            schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(), 
metaStorageManager.watchExecutor());
             
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
 
             schemaSyncService = new 
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 20cc515ff44..eee80e1e468 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -128,7 +128,7 @@ import org.jetbrains.annotations.TestOnly;
  *     <li>Providing corresponding Meta storage service proxy interface</li>
  * </ul>
  */
-public class MetaStorageManagerImpl implements MetaStorageManager, 
MetastorageGroupMaintenance {
+public class MetaStorageManagerImpl implements MetaStorageManager, 
MetastorageGroupMaintenance, WatchProcessorAccess {
     private static final IgniteLogger LOG = 
Loggers.forClass(MetaStorageManagerImpl.class);
 
     private final ClusterService clusterService;
@@ -1409,4 +1409,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     public void markAsStopping() {
         metaStorageSvcFut.thenAccept(MetaStorageServiceImpl::markAsStopping);
     }
+
+    @Override
+    public Executor watchExecutor() {
+        return storage.watchExecutor();
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/WatchProcessorAccess.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/WatchProcessorAccess.java
new file mode 100644
index 00000000000..8ad66e34dc6
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/WatchProcessorAccess.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Provides access to watch processing required by internal components.
+ */
+public interface WatchProcessorAccess {
+    /**
+     * Returns executor for watch processing.
+     */
+    Executor watchExecutor();
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index 292de6fe285..a41a8a092fb 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.failure.FailureProcessor;
@@ -396,4 +397,9 @@ public abstract class AbstractKeyValueStorage implements 
KeyValueStorage {
             notifyWatchProcessorEventsBeforeStartingWatches = null;
         }
     }
+
+    @Override
+    public Executor watchExecutor() {
+        return watchProcessor.watchExecutor();
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index ef9b9887d89..2afb4861cf4 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -21,6 +21,7 @@ import java.nio.file.Path;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.CommandId;
@@ -558,4 +559,9 @@ public interface KeyValueStorage extends ManuallyCloseable {
      * @return Future that's completed when flushing of the data is completed.
      */
     CompletableFuture<Void> flush();
+
+    /**
+     * Returns executor used to execute watches.
+     */
+    Executor watchExecutor();
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 4e2691ed0d4..a11b5131b24 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -39,6 +39,7 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -513,4 +514,8 @@ public class WatchProcessor implements ManuallyCloseable {
                 IDEMPOTENT_COMMAND_PREFIX_BYTES, 0, prefixLength
         );
     }
+
+    Executor watchExecutor() {
+        return watchExecutor;
+    }
 }
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 9339fd161a3..65b9de944a1 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -639,7 +639,7 @@ public class Node {
 
         volatileLogStorageManagerCreator = new 
VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-" 
+ name));
 
-        schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
+        schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(), 
metaStorageManager.watchExecutor());
         
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
 
         LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index d73fc0a2448..15bf03e44b7 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -602,7 +602,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 zoneId -> completedFuture(Set.of())
         );
 
-        var schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
+        var schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), 
metaStorageMgr.watchExecutor());
         
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
 
         LongSupplier delayDurationMsSupplier = () -> 
TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 8cef74418ee..7ba2b4d4cd3 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -935,7 +935,7 @@ public class IgniteImpl implements Ignite {
                 volatileLogStorageManagerCreator
         );
 
-        schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
+        schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), 
metaStorageMgr.watchExecutor());
         
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
 
         SchemaSyncService schemaSyncService = new 
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java
index f05e79c0ada..9fb4beeada2 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -39,6 +40,8 @@ import org.jetbrains.annotations.TestOnly;
 public class SchemaSafeTimeTrackerImpl implements SchemaSafeTimeTracker, 
IgniteComponent, NotificationEnqueuedListener {
     private final ClusterTime clusterTime;
 
+    private final Executor watchExecutor;
+
     private final PendingComparableValuesTracker<HybridTimestamp, Void> 
schemaSafeTime =
             new PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE);
 
@@ -46,8 +49,9 @@ public class SchemaSafeTimeTrackerImpl implements 
SchemaSafeTimeTracker, IgniteC
 
     private final Object futureMutex = new Object();
 
-    public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime) {
+    public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime, Executor 
watchExecutor) {
         this.clusterTime = clusterTime;
+        this.watchExecutor = watchExecutor;
     }
 
     @Override
@@ -74,16 +78,16 @@ public class SchemaSafeTimeTrackerImpl implements 
SchemaSafeTimeTracker, IgniteC
                 // The update touches the Catalog (i.e. schemas), so we must 
chain with the core notification future
                 // as Catalog listeners will be included in it (because we 
need to wait for those listeners to finish execution
                 // before updating the schema safe time).
-                newSchemaSafeTimeUpdateFuture = 
schemaSafeTimeUpdateFuture.thenCompose(unused -> newNotificationFuture);
+                newSchemaSafeTimeUpdateFuture = 
schemaSafeTimeUpdateFuture.thenComposeAsync(unused -> newNotificationFuture, 
watchExecutor);
             } else {
                 // The update does not concern the Catalog (schemas), so we 
can update schema safe time as soon as previous updates to it
                 // get applied.
                 newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture;
             }
 
-            newSchemaSafeTimeUpdateFuture = 
newSchemaSafeTimeUpdateFuture.thenRun(() -> {
+            newSchemaSafeTimeUpdateFuture = 
newSchemaSafeTimeUpdateFuture.thenRunAsync(() -> {
                 schemaSafeTime.update(timestamp, null);
-            });
+            }, watchExecutor);
 
             schemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture;
         }
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java
index 5942e6ead89..fb3c411674a 100644
--- 
a/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java
@@ -25,11 +25,14 @@ import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.when;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -37,15 +40,19 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 class SchemaSafeTimeTrackerImplTest extends BaseIgniteAbstractTest {
     @Mock
     private ClusterTime clusterTime;
 
+    @InjectExecutorService(threadCount = 1)
+    private ExecutorService executor;
+
     private SchemaSafeTimeTrackerImpl tracker;
 
     @BeforeEach
     void createTracker() {
-        tracker = new SchemaSafeTimeTrackerImpl(clusterTime);
+        tracker = new SchemaSafeTimeTrackerImpl(clusterTime, executor);
     }
 
     @Test

Reply via email to