This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 56f24f7 Prune expired messages less frequently in internode messaging 56f24f7 is described below commit 56f24f78f62c9945fae40790e3ed09893fa1ed18 Author: Sergio Bossa <sergio.bo...@gmail.com> AuthorDate: Thu Apr 2 19:30:26 2020 +0100 Prune expired messages less frequently in internode messaging Patch by Sergio Bossa; Reviewed by Aleksey Yeschenko for CASSANDRA-15700 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/net/Message.java | 5 + .../apache/cassandra/net/OutboundConnection.java | 6 +- .../apache/cassandra/net/OutboundMessageQueue.java | 88 +++++++++---- .../cassandra/net/OutboundMessageQueueTest.java | 137 ++++++++++++++++++++- 5 files changed, 207 insertions(+), 30 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0781ca2..1c30b58 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha5 + * Prune expired messages less frequently in internode messaging (CASSANDRA-15700) * Fix Ec2Snitch handling of legacy mode for dc names matching both formats, eg "us-west-2" (CASSANDRA-15878) * Add support for server side DESCRIBE statements (CASSANDRA-14825) * Fail startup if -Xmn is set when the G1 garbage collector is used (CASSANDRA-15839) diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java index 0eb7710..01ba5d4 100644 --- a/src/java/org/apache/cassandra/net/Message.java +++ b/src/java/org/apache/cassandra/net/Message.java @@ -188,6 +188,11 @@ public class Message<T> return outWithParam(nextId(), verb, payload, null, null); } + public static <T> Message<T> out(Verb verb, T payload, long expiresAtNanos) + { + return outWithParam(nextId(), verb, expiresAtNanos, payload, 0, null, null); + } + public static <T> Message<T> outWithFlag(Verb verb, T payload, MessageFlag flag) { assert !verb.isResponse(); diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index d7ebcd8..635f221 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -295,7 +295,7 @@ public class OutboundConnection this.reserveCapacityInBytes = reserveCapacityInBytes; this.callbacks = template.callbacks; this.debug = template.debug; - this.queue = new OutboundMessageQueue(this::onExpired); + this.queue = new OutboundMessageQueue(approxTime, this::onExpired); this.delivery = type == ConnectionType.LARGE_MESSAGES ? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor) : new EventLoopDelivery(); @@ -571,8 +571,8 @@ public class OutboundConnection */ void executeAgain() { - // if we are already executing, set EXECUTING_AGAIN and leave scheduling to the currently running one. - // otherwise, set ourselves unconditionally to EXECUTING and schedule ourselves immediately + // if we are already executing, set EXECUTING_AGAIN and leave scheduling to the currently running one. + // otherwise, set ourselves unconditionally to EXECUTING and schedule ourselves immediately if (!isExecuting(getAndUpdate(i -> !isExecuting(i) ? EXECUTING : EXECUTING_AGAIN))) executor.execute(this); } diff --git a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java index 48c7666..3d8bac0 100644 --- a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java +++ b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java @@ -21,18 +21,20 @@ import java.util.Collections; import java.util.IdentityHashMap; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Uninterruptibles; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.utils.MonotonicClock; + import static java.lang.Math.min; -import static org.apache.cassandra.utils.MonotonicClock.approxTime; /** * A composite queue holding messages to be delivered by an {@link OutboundConnection}. @@ -59,17 +61,22 @@ class OutboundMessageQueue boolean accept(Message<?> message) throws Produces; } + private final MonotonicClock clock; private final MessageConsumer<RuntimeException> onExpired; private final ManyToOneConcurrentLinkedQueue<Message<?>> externalQueue = new ManyToOneConcurrentLinkedQueue<>(); private final PrunableArrayQueue<Message<?>> internalQueue = new PrunableArrayQueue<>(256); private volatile long earliestExpiresAt = Long.MAX_VALUE; + private volatile long nextExpirationDeadline = Long.MAX_VALUE; private static final AtomicLongFieldUpdater<OutboundMessageQueue> earliestExpiresAtUpdater = AtomicLongFieldUpdater.newUpdater(OutboundMessageQueue.class, "earliestExpiresAt"); + private static final AtomicLongFieldUpdater<OutboundMessageQueue> nextExpirationDeadlineUpdater = + AtomicLongFieldUpdater.newUpdater(OutboundMessageQueue.class, "nextExpirationDeadline"); - OutboundMessageQueue(MessageConsumer<RuntimeException> onExpired) + OutboundMessageQueue(MonotonicClock clock, MessageConsumer<RuntimeException> onExpired) { + this.clock = clock; this.onExpired = onExpired; } @@ -80,7 +87,9 @@ class OutboundMessageQueue { maybePruneExpired(); externalQueue.offer(m); - maybeUpdateMinimumExpiryTime(m.expiresAtNanos()); + nextExpirationDeadlineUpdater.accumulateAndGet(this, + maybeUpdateEarliestExpiresAt(clock.now(), m.expiresAtNanos()), + Math::min); } /** @@ -105,7 +114,7 @@ class OutboundMessageQueue */ void runEventually(Consumer<WithLock> runEventually) { - try (WithLock withLock = lockOrCallback(approxTime.now(), () -> runEventually(runEventually))) + try (WithLock withLock = lockOrCallback(clock.now(), () -> runEventually(runEventually))) { if (withLock != null) runEventually.accept(withLock); @@ -136,7 +145,6 @@ class OutboundMessageQueue private WithLock(long nowNanos) { this.nowNanos = nowNanos; - earliestExpiresAt = Long.MAX_VALUE; externalQueue.drain(internalQueue::offer); } @@ -145,7 +153,7 @@ class OutboundMessageQueue Message<?> m; while (null != (m = internalQueue.poll())) { - if (shouldSend(m, nowNanos)) + if (shouldSend(m, clock, nowNanos)) break; onExpired.accept(m); @@ -165,7 +173,7 @@ class OutboundMessageQueue Message<?> m; while (null != (m = internalQueue.peek())) { - if (shouldSend(m, nowNanos)) + if (shouldSend(m, clock, nowNanos)) break; internalQueue.poll(); @@ -185,7 +193,9 @@ class OutboundMessageQueue @Override public void close() { - pruneInternalQueueWithLock(nowNanos); + if (clock.isAfter(nowNanos, nextExpirationDeadline)) + pruneInternalQueueWithLock(nowNanos); + unlock(); } } @@ -195,20 +205,47 @@ class OutboundMessageQueue */ boolean maybePruneExpired() { - return maybePruneExpired(approxTime.now()); + return maybePruneExpired(clock.now()); } private boolean maybePruneExpired(long nowNanos) { - if (approxTime.isAfter(nowNanos, earliestExpiresAt)) + if (clock.isAfter(nowNanos, nextExpirationDeadline)) return tryRun(() -> pruneWithLock(nowNanos)); + return false; } - private void maybeUpdateMinimumExpiryTime(long newTime) + /** + * Update {@code earliestExpiresAt} with the given {@code candidateTime} if less than the current value OR + * if the current value is past the current {@code nowNanos} time: this last condition is needed to make sure we keep + * tracking the earliest expiry time even while we prune previous values, so that at the end of the pruning task, + * we can reconcile between the earliest expiry time recorded at pruning and the one recorded at insert time. + */ + private long maybeUpdateEarliestExpiresAt(long nowNanos, long candidateTime) { - if (newTime < earliestExpiresAt) - earliestExpiresAtUpdater.accumulateAndGet(this, newTime, Math::min); + return earliestExpiresAtUpdater.accumulateAndGet(this, candidateTime, (oldTime, newTime) -> { + if (clock.isAfter(nowNanos, oldTime)) + return newTime; + else + return min(oldTime, newTime); + }); + } + + /** + * Update {@code nextExpirationDeadline} with the given {@code candidateDeadline} if less than the current + * deadline, unless the current deadline is passed in relation to {@code nowNanos}: this is needed + * to resolve a race where both {@link #add(org.apache.cassandra.net.Message) } and {@link #pruneInternalQueueWithLock(long) } + * try to update the expiration deadline. + */ + private long maybeUpdateNextExpirationDeadline(long nowNanos, long candidateDeadline) + { + return nextExpirationDeadlineUpdater.accumulateAndGet(this, candidateDeadline, (oldDeadline, newDeadline) -> { + if (clock.isAfter(nowNanos, oldDeadline)) + return newDeadline; + else + return min(oldDeadline, newDeadline); + }); } /* @@ -216,7 +253,6 @@ class OutboundMessageQueue */ private void pruneWithLock(long nowNanos) { - earliestExpiresAt = Long.MAX_VALUE; externalQueue.drain(internalQueue::offer); pruneInternalQueueWithLock(nowNanos); } @@ -232,7 +268,7 @@ class OutboundMessageQueue public boolean shouldPrune(Message<?> message) { - return !shouldSend(message, nowNanos); + return !shouldSend(message, clock, nowNanos); } public void onPruned(Message<?> message) @@ -249,7 +285,13 @@ class OutboundMessageQueue Pruner pruner = new Pruner(); internalQueue.prune(pruner); - maybeUpdateMinimumExpiryTime(pruner.earliestExpiresAt); + maybeUpdateNextExpirationDeadline(nowNanos, maybeUpdateEarliestExpiresAt(nowNanos, pruner.earliestExpiresAt)); + } + + @VisibleForTesting + long nextExpirationIn(long nowNanos, TimeUnit unit) + { + return unit.convert(nextExpirationDeadline - nowNanos, TimeUnit.NANOSECONDS); } private static class Locked implements Runnable @@ -439,10 +481,12 @@ class OutboundMessageQueue } Remover remover = new Remover(); - earliestExpiresAt = Long.MAX_VALUE; externalQueue.drain(internalQueue::offer); internalQueue.prune(remover); - maybeUpdateMinimumExpiryTime(remover.earliestExpiresAt); + + long nowNanos = clock.now(); + maybeUpdateNextExpirationDeadline(nowNanos, maybeUpdateEarliestExpiresAt(nowNanos, remover.earliestExpiresAt)); + done.countDown(); } } @@ -456,7 +500,7 @@ class OutboundMessageQueue { if (remove == null) throw new NullPointerException(); - + RemoveRunner runner; while (true) { @@ -477,8 +521,8 @@ class OutboundMessageQueue return runner.removed.contains(remove); } - private static boolean shouldSend(Message<?> m, long nowNanos) + private static boolean shouldSend(Message<?> m, MonotonicClock clock, long nowNanos) { - return !approxTime.isAfter(nowNanos, m.expiresAtNanos()); + return !clock.isAfter(nowNanos, m.expiresAtNanos()); } } diff --git a/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java b/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java index db571ac..860e4f1 100644 --- a/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java +++ b/test/unit/org/apache/cassandra/net/OutboundMessageQueueTest.java @@ -18,7 +18,10 @@ package org.apache.cassandra.net; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.Uninterruptibles; @@ -27,16 +30,14 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.OutboundMessageQueue; -import org.apache.cassandra.net.Verb; +import org.apache.cassandra.utils.FreeRunningClock; import static org.apache.cassandra.net.NoPayload.noPayload; +import static org.apache.cassandra.utils.MonotonicClock.approxTime; // TODO: incomplete public class OutboundMessageQueueTest { - @BeforeClass public static void init() { @@ -50,7 +51,7 @@ public class OutboundMessageQueueTest final Message<?> m2 = Message.out(Verb._TEST_1, noPayload); final Message<?> m3 = Message.out(Verb._TEST_1, noPayload); - final OutboundMessageQueue queue = new OutboundMessageQueue(message -> true); + final OutboundMessageQueue queue = new OutboundMessageQueue(approxTime, message -> true); queue.add(m1); queue.add(m2); queue.add(m3); @@ -91,4 +92,130 @@ public class OutboundMessageQueueTest } } + @Test + public void testExpirationOnIteration() + { + FreeRunningClock clock = new FreeRunningClock(); + + List<Message> expiredMessages = new LinkedList<>(); + long startTime = clock.now(); + + Message<?> m1 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(7)); + Message<?> m2 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(3)); + Message<?> m3; + Message<?> m4; + + OutboundMessageQueue queue = new OutboundMessageQueue(clock, m -> expiredMessages.add(m)); + queue.add(m1); + queue.add(m2); + + try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {})) + { + // Do nothing + } + // Check next expiry time is equal to m2, and we haven't expired anything yet: + Assert.assertEquals(3, queue.nextExpirationIn(startTime, TimeUnit.SECONDS)); + Assert.assertTrue(expiredMessages.isEmpty()); + + // Wait for m2 expiry time: + clock.advance(4, TimeUnit.SECONDS); + + try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {})) + { + // Add a new message while we're iterating the queue: this will expire later than any existing message. + m3 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(60)); + queue.add(m3); + } + // After expiration runs following the WithLock#close(), check the expiration time is updated to m1 (not m3): + Assert.assertEquals(7, queue.nextExpirationIn(startTime, TimeUnit.SECONDS)); + // Also, m2 was expired and collected: + Assert.assertEquals(m2, expiredMessages.remove(0)); + + // Wait for m1 expiry time: + clock.advance(4, TimeUnit.SECONDS); + + try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {})) + { + // Add a new message while we're iterating the queue: this will expire sooner than the already existing message. + m4 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(10)); + queue.add(m4); + } + // Check m1 was expired and collected: + Assert.assertEquals(m1, expiredMessages.remove(0)); + // Check next expiry time is m4 (not m3): + Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS)); + + // Consume all messages before expiration: + try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {})) + { + Assert.assertEquals(m3, l.poll()); + Assert.assertEquals(m4, l.poll()); + } + // Check next expiry time is still m4 as the deadline hasn't passed yet: + Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS)); + + // Go past the deadline: + clock.advance(4, TimeUnit.SECONDS); + + try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {})) + { + // Do nothing, just trigger expiration on close + } + // Check nothing is expired: + Assert.assertTrue(expiredMessages.isEmpty()); + // Check next expiry time is now Long.MAX_VALUE as nothing was in the queue: + Assert.assertEquals(Long.MAX_VALUE, queue.nextExpirationIn(0, TimeUnit.NANOSECONDS)); + } + + @Test + public void testExpirationOnAdd() + { + FreeRunningClock clock = new FreeRunningClock(); + + List<Message> expiredMessages = new LinkedList<>(); + long startTime = clock.now(); + + OutboundMessageQueue queue = new OutboundMessageQueue(clock, m -> expiredMessages.add(m)); + + Message<?> m1 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(7)); + Message<?> m2 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(3)); + queue.add(m1); + queue.add(m2); + + // Check next expiry time is equal to m2, and we haven't expired anything yet: + Assert.assertEquals(3, queue.nextExpirationIn(startTime, TimeUnit.SECONDS)); + Assert.assertTrue(expiredMessages.isEmpty()); + + // Go past m1 expiry time: + clock.advance(8, TimeUnit.SECONDS); + + // Add a new message and verify both m1 and m2 have been expired: + Message<?> m3 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(10)); + queue.add(m3); + Assert.assertEquals(m2, expiredMessages.remove(0)); + Assert.assertEquals(m1, expiredMessages.remove(0)); + + // New expiration deadline is m3: + Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS)); + + // Go past m3 expiry time: + clock.advance(4, TimeUnit.SECONDS); + + try(OutboundMessageQueue.WithLock l = queue.lockOrCallback(clock.now(), () -> {})) + { + // Add a new message and verify nothing is expired because the lock is held by this iteration: + Message<?> m4 = Message.out(Verb._TEST_1, noPayload, startTime + TimeUnit.SECONDS.toNanos(15)); + queue.add(m4); + Assert.assertTrue(expiredMessages.isEmpty()); + + // Also the deadline didn't change, even though we're past the m3 expiry time: this way we're sure the + // pruner will run promptly even if falling behind during iteration. + Assert.assertEquals(10, queue.nextExpirationIn(startTime, TimeUnit.SECONDS)); + } + + // Check post iteration m3 has expired: + Assert.assertEquals(m3, expiredMessages.remove(0)); + // And deadline is now m4: + Assert.assertEquals(15, queue.nextExpirationIn(startTime, TimeUnit.SECONDS)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org