This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 48ca4d4632 IGNITE-19963 Move completion of ClockWaiter futures to a 
special thread pool (#2310)
48ca4d4632 is described below

commit 48ca4d4632d8dc2d55fbde4bbfa7a6c52a841b30
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Thu Jul 13 14:00:32 2023 +0400

    IGNITE-19963 Move completion of ClockWaiter futures to a special thread 
pool (#2310)
---
 .../ignite/internal/catalog/ClockWaiter.java       | 95 ++++++++++++++--------
 .../ignite/internal/catalog/ClockWaiterTest.java   |  3 +-
 .../ignite/internal/hlc/ClockUpdateListener.java   | 12 ++-
 .../ignite/internal/hlc/HybridClockImpl.java       | 24 ++++--
 .../apache/ignite/internal/HybridClockTest.java    |  8 +-
 .../apache/ignite/internal/TestHybridClock.java    |  2 -
 6 files changed, 96 insertions(+), 48 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
index 8c5c65df22..7141237fbd 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
@@ -17,14 +17,18 @@
 
 package org.apache.ignite.internal.catalog;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.ClockUpdateListener;
@@ -35,6 +39,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.TrackerClosedException;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -59,18 +64,39 @@ public class ClockWaiter implements IgniteComponent {
 
     private final ClockUpdateListener updateListener = this::onUpdate;
 
+    private final Runnable triggerClockUpdate = this::triggerTrackerUpdate;
+
+    /** Executor on which short-lived tasks are scheduled that are needed to 
timely complete awaiting futures. */
     private volatile ScheduledExecutorService scheduler;
 
+    /** Executor that executes completion of futures returned to the user, so 
it might take arbitrarily heavy operations. */
+    private final ExecutorService futureExecutor;
+
+    /**
+     * Creates a new {@link ClockWaiter}.
+     *
+     * @param nodeName Name of the current Ignite node.
+     * @param clock Clock to look at.
+     */
     public ClockWaiter(String nodeName, HybridClock clock) {
         this.nodeName = nodeName;
         this.clock = clock;
+
+        futureExecutor = new ThreadPoolExecutor(
+                0,
+                4,
+                1,
+                TimeUnit.MINUTES,
+                new LinkedBlockingQueue<>(),
+                new NamedThreadFactory(nodeName + 
"-clock-waiter-future-executor", LOG)
+        );
     }
 
     @Override
     public void start() {
         clock.addUpdateListener(updateListener);
 
-        scheduler = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+        scheduler = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory(nodeName + "-clock-waiter-scheduler", LOG));
     }
 
     @Override
@@ -91,6 +117,9 @@ public class ClockWaiter implements IgniteComponent {
         // user-facing futures we return from the tracker), but we don't need 
them for anything else,
         // so it's simpler to just use shutdownNow().
         scheduler.shutdownNow();
+
+        IgniteUtils.shutdownAndAwaitTermination(futureExecutor, 10, 
TimeUnit.SECONDS);
+
         scheduler.awaitTermination(10, TimeUnit.SECONDS);
     }
 
@@ -109,6 +138,9 @@ public class ClockWaiter implements IgniteComponent {
     /**
      * Wait for the clock to reach the given timestamp.
      *
+     * <p>If completion of the returned future triggers some I/O operations or 
causes the code to block, it is highly
+     * recommended to execute those completion stages on a specific thread 
pool to avoid the waiter's pool starvation.
+     *
      * @param targetTimestamp Timestamp to wait for.
      * @return A future that completes when the timestamp is reached by the 
clock's time.
      */
@@ -125,47 +157,44 @@ public class ClockWaiter implements IgniteComponent {
     }
 
     private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) 
{
+        HybridTimestamp now = clock.now();
+
+        if (targetTimestamp.compareTo(now) <= 0) {
+            return completedFuture(null);
+        }
+
         CompletableFuture<Void> future = 
nowTracker.waitFor(targetTimestamp.longValue());
 
-        ScheduledFuture<?> scheduledFuture;
+        // Adding 1 to account for a possible non-null logical part of the 
targetTimestamp.
+        long millisToWait = targetTimestamp.getPhysical() - now.getPhysical() 
+ 1;
 
-        if (!future.isDone()) {
-            // This triggers a clock update.
-            HybridTimestamp now = clock.now();
+        ScheduledFuture<?> scheduledFuture = 
scheduler.schedule(triggerClockUpdate, millisToWait, TimeUnit.MILLISECONDS);
 
-            if (targetTimestamp.compareTo(now) <= 0) {
-                assert future.isDone();
+        // The future might be completed in a random thread, so let's move its 
completion execution to a special thread pool
+        // because the user's code following the future completion might run 
arbitrarily heavy operations and we don't want
+        // to put them on an innocent thread invoking now()/update() on the 
clock.
+        return future
+                .handleAsync((res, ex) -> {
+                    scheduledFuture.cancel(true);
 
-                scheduledFuture = null;
-            } else {
-                // Adding 1 to account for a possible non-null logical part of 
the targetTimestamp.
-                long millisToWait = targetTimestamp.getPhysical() - 
now.getPhysical() + 1;
+                    if (ex != null) {
+                        translateTrackerClosedException(ex);
+                    }
 
-                scheduledFuture = scheduler.schedule(this::triggerClockUpdate, 
millisToWait, TimeUnit.MILLISECONDS);
-            }
+                    return res;
+                }, futureExecutor);
+    }
+
+    private static void translateTrackerClosedException(Throwable ex) {
+        if (ex instanceof TrackerClosedException) {
+            throw new CancellationException();
         } else {
-            scheduledFuture = null;
+            throw new CompletionException(ex);
         }
-
-        return future.handle((res, ex) -> {
-            if (scheduledFuture != null) {
-                scheduledFuture.cancel(true);
-            }
-
-            if (ex != null) {
-                // Let's replace a TrackerClosedException with a 
CancellationException as the latter makes more sense for the clients.
-                if (ex instanceof TrackerClosedException) {
-                    throw new CancellationException();
-                } else {
-                    throw new CompletionException(ex);
-                }
-            }
-
-            return res;
-        });
     }
 
-    private void triggerClockUpdate() {
-        clock.now();
+    private void triggerTrackerUpdate() {
+        onUpdate(clock.nowLong());
     }
+
 }
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
index e970e4027c..e175cca7e5 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.catalog;
 
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -68,7 +69,7 @@ class ClockWaiterTest {
 
         clock.update(oneYearAhead);
 
-        assertThat(future.isDone(), is(true));
+        assertThat(future, willCompleteSuccessfully());
     }
 
     private HybridTimestamp getOneYearAhead() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
index e550a9b486..e8d128c8a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
@@ -18,13 +18,19 @@
 package org.apache.ignite.internal.hlc;
 
 /**
- * Used to track updates of a {@link HybridClock}: it gets notified each time 
the clock 'ticks', including
- * adjustments caused by external events.
+ * Used to track updates of a {@link HybridClock}: it gets notified each time 
{@link HybridClock#update(HybridTimestamp)},
+ * is invoked.
  */
 @FunctionalInterface
 public interface ClockUpdateListener {
     /**
-     * Called when the clock's current time advances.
+     * Called when the clock's current time advances due to a call to {@link 
HybridClock#update(HybridTimestamp)}.
+     *
+     * <p>This does NOT get called when the clock current time gets advanced 
by a call to
+     * {@link HybridClock#now()}/{@link HybridClock#nowLong()}.
+     *
+     * <p>This method must NOT do any I/O operations or block. If such 
operations are needed, it should schedule them
+     * on a thread pool.
      *
      * @param newTs New timestamp on the clock (represented as a long value, 
see {@link HybridTimestamp#longValue()}.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 4209d31674..843dcb7e18 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -26,12 +26,16 @@ import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.tostring.S;
 
 /**
  * A Hybrid Logical Clock implementation.
  */
 public class HybridClockImpl implements HybridClock {
+    private final IgniteLogger log = Loggers.forClass(HybridClockImpl.class);
+
     /**
      * Var handle for {@link #latestTime}.
      */
@@ -71,15 +75,23 @@ public class HybridClockImpl implements HybridClock {
             long newLatestTime = max(oldLatestTime + 1, now);
 
             if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
-                notifyUpdateListeners(newLatestTime);
-
                 return newLatestTime;
             }
         }
     }
 
     private void notifyUpdateListeners(long newTs) {
-        updateListeners.forEach(listener -> listener.onUpdate(newTs));
+        for (ClockUpdateListener listener : updateListeners) {
+            try {
+                listener.onUpdate(newTs);
+            } catch (Throwable e) {
+                log.error("ClockUpdateListener#onUpdate() failed for {} at 
{}", e, listener, newTs);
+
+                if (e instanceof Error) {
+                    throw e;
+                }
+            }
+        }
     }
 
     @Override
@@ -88,10 +100,12 @@ public class HybridClockImpl implements HybridClock {
     }
 
     /**
-     * Creates a timestamp for a received event.
+     * Updates the clock in accordance with an external event timestamp. If 
the supplied timestamp is ahead of the
+     * current clock timestamp, the clock gets adjusted to make sure it never 
returns any timestamp before (or equal to)
+     * the supplied external timestamp.
      *
      * @param requestTime Timestamp from request.
-     * @return The hybrid timestamp.
+     * @return The resulting timestamp (guaranteed to exceed both previous 
clock 'currentTs' and the supplied external ts).
      */
     @Override
     public HybridTimestamp update(HybridTimestamp requestTime) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java 
b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
index 41fd6174ac..7aafc60950 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
@@ -120,25 +120,25 @@ class HybridClockTest {
     }
 
     @Test
-    void updateListenerGetsNotifiedOnUpdateCausedByNowCall() {
+    void updateListenerIsNotNotifiedOnNowCall() {
         HybridClock clock = new HybridClockImpl();
 
         clock.addUpdateListener(updateListener);
 
         HybridTimestamp ts = clock.now();
 
-        verify(updateListener).onUpdate(ts.longValue());
+        verify(updateListener, never()).onUpdate(ts.longValue());
     }
 
     @Test
-    void updateListenerGetsNotifiedOnUpdateCausedByNowLongCall() {
+    void updateListenerIsNotNotifiedOnNowLongCall() {
         HybridClock clock = new HybridClockImpl();
 
         clock.addUpdateListener(updateListener);
 
         long ts = clock.nowLong();
 
-        verify(updateListener).onUpdate(ts);
+        verify(updateListener, never()).onUpdate(ts);
     }
 
     @Test
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
index aa2d5cec37..3711106746 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
@@ -76,8 +76,6 @@ public class TestHybridClock implements HybridClock {
             long newLatestTime = max(oldLatestTime + 1, now);
 
             if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
-                notifyUpdateListeners(newLatestTime);
-
                 return newLatestTime;
             }
         }

Reply via email to