This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 48ca4d4632 IGNITE-19963 Move completion of ClockWaiter futures to a special thread pool (#2310) 48ca4d4632 is described below commit 48ca4d4632d8dc2d55fbde4bbfa7a6c52a841b30 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Jul 13 14:00:32 2023 +0400 IGNITE-19963 Move completion of ClockWaiter futures to a special thread pool (#2310) --- .../ignite/internal/catalog/ClockWaiter.java | 95 ++++++++++++++-------- .../ignite/internal/catalog/ClockWaiterTest.java | 3 +- .../ignite/internal/hlc/ClockUpdateListener.java | 12 ++- .../ignite/internal/hlc/HybridClockImpl.java | 24 ++++-- .../apache/ignite/internal/HybridClockTest.java | 8 +- .../apache/ignite/internal/TestHybridClock.java | 2 - 6 files changed, 96 insertions(+), 48 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java index 8c5c65df22..7141237fbd 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java @@ -17,14 +17,18 @@ package org.apache.ignite.internal.catalog; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.hlc.ClockUpdateListener; @@ -35,6 +39,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.TrackerClosedException; import org.apache.ignite.lang.NodeStoppingException; @@ -59,18 +64,39 @@ public class ClockWaiter implements IgniteComponent { private final ClockUpdateListener updateListener = this::onUpdate; + private final Runnable triggerClockUpdate = this::triggerTrackerUpdate; + + /** Executor on which short-lived tasks are scheduled that are needed to timely complete awaiting futures. */ private volatile ScheduledExecutorService scheduler; + /** Executor that executes completion of futures returned to the user, so it might take arbitrarily heavy operations. */ + private final ExecutorService futureExecutor; + + /** + * Creates a new {@link ClockWaiter}. + * + * @param nodeName Name of the current Ignite node. + * @param clock Clock to look at. + */ public ClockWaiter(String nodeName, HybridClock clock) { this.nodeName = nodeName; this.clock = clock; + + futureExecutor = new ThreadPoolExecutor( + 0, + 4, + 1, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>(), + new NamedThreadFactory(nodeName + "-clock-waiter-future-executor", LOG) + ); } @Override public void start() { clock.addUpdateListener(updateListener); - scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter", LOG)); + scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(nodeName + "-clock-waiter-scheduler", LOG)); } @Override @@ -91,6 +117,9 @@ public class ClockWaiter implements IgniteComponent { // user-facing futures we return from the tracker), but we don't need them for anything else, // so it's simpler to just use shutdownNow(). scheduler.shutdownNow(); + + IgniteUtils.shutdownAndAwaitTermination(futureExecutor, 10, TimeUnit.SECONDS); + scheduler.awaitTermination(10, TimeUnit.SECONDS); } @@ -109,6 +138,9 @@ public class ClockWaiter implements IgniteComponent { /** * Wait for the clock to reach the given timestamp. * + * <p>If completion of the returned future triggers some I/O operations or causes the code to block, it is highly + * recommended to execute those completion stages on a specific thread pool to avoid the waiter's pool starvation. + * * @param targetTimestamp Timestamp to wait for. * @return A future that completes when the timestamp is reached by the clock's time. */ @@ -125,47 +157,44 @@ public class ClockWaiter implements IgniteComponent { } private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) { + HybridTimestamp now = clock.now(); + + if (targetTimestamp.compareTo(now) <= 0) { + return completedFuture(null); + } + CompletableFuture<Void> future = nowTracker.waitFor(targetTimestamp.longValue()); - ScheduledFuture<?> scheduledFuture; + // Adding 1 to account for a possible non-null logical part of the targetTimestamp. + long millisToWait = targetTimestamp.getPhysical() - now.getPhysical() + 1; - if (!future.isDone()) { - // This triggers a clock update. - HybridTimestamp now = clock.now(); + ScheduledFuture<?> scheduledFuture = scheduler.schedule(triggerClockUpdate, millisToWait, TimeUnit.MILLISECONDS); - if (targetTimestamp.compareTo(now) <= 0) { - assert future.isDone(); + // The future might be completed in a random thread, so let's move its completion execution to a special thread pool + // because the user's code following the future completion might run arbitrarily heavy operations and we don't want + // to put them on an innocent thread invoking now()/update() on the clock. + return future + .handleAsync((res, ex) -> { + scheduledFuture.cancel(true); - scheduledFuture = null; - } else { - // Adding 1 to account for a possible non-null logical part of the targetTimestamp. - long millisToWait = targetTimestamp.getPhysical() - now.getPhysical() + 1; + if (ex != null) { + translateTrackerClosedException(ex); + } - scheduledFuture = scheduler.schedule(this::triggerClockUpdate, millisToWait, TimeUnit.MILLISECONDS); - } + return res; + }, futureExecutor); + } + + private static void translateTrackerClosedException(Throwable ex) { + if (ex instanceof TrackerClosedException) { + throw new CancellationException(); } else { - scheduledFuture = null; + throw new CompletionException(ex); } - - return future.handle((res, ex) -> { - if (scheduledFuture != null) { - scheduledFuture.cancel(true); - } - - if (ex != null) { - // Let's replace a TrackerClosedException with a CancellationException as the latter makes more sense for the clients. - if (ex instanceof TrackerClosedException) { - throw new CancellationException(); - } else { - throw new CompletionException(ex); - } - } - - return res; - }); } - private void triggerClockUpdate() { - clock.now(); + private void triggerTrackerUpdate() { + onUpdate(clock.nowLong()); } + } diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java index e970e4027c..e175cca7e5 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.catalog; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -68,7 +69,7 @@ class ClockWaiterTest { clock.update(oneYearAhead); - assertThat(future.isDone(), is(true)); + assertThat(future, willCompleteSuccessfully()); } private HybridTimestamp getOneYearAhead() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java index e550a9b486..e8d128c8a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java @@ -18,13 +18,19 @@ package org.apache.ignite.internal.hlc; /** - * Used to track updates of a {@link HybridClock}: it gets notified each time the clock 'ticks', including - * adjustments caused by external events. + * Used to track updates of a {@link HybridClock}: it gets notified each time {@link HybridClock#update(HybridTimestamp)}, + * is invoked. */ @FunctionalInterface public interface ClockUpdateListener { /** - * Called when the clock's current time advances. + * Called when the clock's current time advances due to a call to {@link HybridClock#update(HybridTimestamp)}. + * + * <p>This does NOT get called when the clock current time gets advanced by a call to + * {@link HybridClock#now()}/{@link HybridClock#nowLong()}. + * + * <p>This method must NOT do any I/O operations or block. If such operations are needed, it should schedule them + * on a thread pool. * * @param newTs New timestamp on the clock (represented as a long value, see {@link HybridTimestamp#longValue()}. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java index 4209d31674..843dcb7e18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java @@ -26,12 +26,16 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.tostring.S; /** * A Hybrid Logical Clock implementation. */ public class HybridClockImpl implements HybridClock { + private final IgniteLogger log = Loggers.forClass(HybridClockImpl.class); + /** * Var handle for {@link #latestTime}. */ @@ -71,15 +75,23 @@ public class HybridClockImpl implements HybridClock { long newLatestTime = max(oldLatestTime + 1, now); if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) { - notifyUpdateListeners(newLatestTime); - return newLatestTime; } } } private void notifyUpdateListeners(long newTs) { - updateListeners.forEach(listener -> listener.onUpdate(newTs)); + for (ClockUpdateListener listener : updateListeners) { + try { + listener.onUpdate(newTs); + } catch (Throwable e) { + log.error("ClockUpdateListener#onUpdate() failed for {} at {}", e, listener, newTs); + + if (e instanceof Error) { + throw e; + } + } + } } @Override @@ -88,10 +100,12 @@ public class HybridClockImpl implements HybridClock { } /** - * Creates a timestamp for a received event. + * Updates the clock in accordance with an external event timestamp. If the supplied timestamp is ahead of the + * current clock timestamp, the clock gets adjusted to make sure it never returns any timestamp before (or equal to) + * the supplied external timestamp. * * @param requestTime Timestamp from request. - * @return The hybrid timestamp. + * @return The resulting timestamp (guaranteed to exceed both previous clock 'currentTs' and the supplied external ts). */ @Override public HybridTimestamp update(HybridTimestamp requestTime) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java index 41fd6174ac..7aafc60950 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java @@ -120,25 +120,25 @@ class HybridClockTest { } @Test - void updateListenerGetsNotifiedOnUpdateCausedByNowCall() { + void updateListenerIsNotNotifiedOnNowCall() { HybridClock clock = new HybridClockImpl(); clock.addUpdateListener(updateListener); HybridTimestamp ts = clock.now(); - verify(updateListener).onUpdate(ts.longValue()); + verify(updateListener, never()).onUpdate(ts.longValue()); } @Test - void updateListenerGetsNotifiedOnUpdateCausedByNowLongCall() { + void updateListenerIsNotNotifiedOnNowLongCall() { HybridClock clock = new HybridClockImpl(); clock.addUpdateListener(updateListener); long ts = clock.nowLong(); - verify(updateListener).onUpdate(ts); + verify(updateListener, never()).onUpdate(ts); } @Test diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java index aa2d5cec37..3711106746 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java @@ -76,8 +76,6 @@ public class TestHybridClock implements HybridClock { long newLatestTime = max(oldLatestTime + 1, now); if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) { - notifyUpdateListeners(newLatestTime); - return newLatestTime; } }