This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new abeaa3e Fix ConnectionTest.testAcquireReleaseOutbound abeaa3e is described below commit abeaa3ea5ef99691cc1b29787cfcd573a90e34fb Author: yifan-c <yc25c...@gmail.com> AuthorDate: Tue Jan 28 11:12:30 2020 -0800 Fix ConnectionTest.testAcquireReleaseOutbound patch by Yifan Cai; reviewed by Benedict for CASSANDRA-15308 --- CHANGES.txt | 1 + .../apache/cassandra/net/OutboundConnection.java | 10 +- .../org/apache/cassandra/net/ConnectionTest.java | 101 ++++++++++++++++----- 3 files changed, 86 insertions(+), 26 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5858c19..0bc3317 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 4.0-alpha4 * Improve the algorithmic token allocation in case racks = RF (CASSANDRA-15600) + * Fix ConnectionTest.testAcquireReleaseOutbound (CASSANDRA-15308) * Include finalized pending sstables in preview repair (CASSANDRA-15553) * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE (CASSANDRA-15271) * Correct inaccurate logging message (CASSANDRA-15549) diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 63b909c..9661e8e 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.stream.Stream; import javax.annotation.Nullable; @@ -1722,8 +1723,15 @@ public class OutboundConnection releaseCapacity(1, amount); } + @VisibleForTesting + void unsafeReleaseCapacity(long count, long amount) + { + releaseCapacity(count, amount); + } + + @VisibleForTesting Limit unsafeGetEndpointReserveLimits() { return reserveCapacityInBytes.endpoint; } -} \ No newline at end of file +} diff --git a/test/unit/org/apache/cassandra/net/ConnectionTest.java b/test/unit/org/apache/cassandra/net/ConnectionTest.java index 7b69cb9..d4ec84c 100644 --- a/test/unit/org/apache/cassandra/net/ConnectionTest.java +++ b/test/unit/org/apache/cassandra/net/ConnectionTest.java @@ -25,11 +25,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -766,40 +768,89 @@ public class ConnectionTest @Test public void testAcquireReleaseOutbound() throws Throwable { + // In each test round, K capacity is reserved upfront. + // Two groups of threads each release/acquire for K capacity in total accordingly, + // i.e. if only the release threads run, at the end, the reserved capacity is 0 (K - K). + // During the test, we expect N (N <= maxFailures) acquire attempts (for M capacity) to fail. + // The reserved capacity (pendingBytes) at the end of the round should equal to K - N * M, + // which you can find in the assertion. test((inbound, outbound, endpoint) -> { - ExecutorService executor = Executors.newFixedThreadPool(100); - int acquireStep = 123; - Assert.assertTrue(outbound.unsafeAcquireCapacity(100 * 10000, 100 * 10000 * acquireStep)); + // max capacity equals to permit-free sendQueueCapcity + the minimun of endpoint and global reserve + double maxSendQueueCapacity = outbound.settings().applicationSendQueueCapacityInBytes + + Double.min(outbound.settings().applicationSendQueueReserveEndpointCapacityInBytes, + outbound.settings().applicationSendQueueReserveGlobalCapacityInBytes.limit()); + int concurrency = 100; + int attempts = 10000; + int acquireCount = concurrency * attempts; + long acquireStep = Math.round(maxSendQueueCapacity * 1.2 / acquireCount / 2); // It is guranteed to acquire (~20%) more + // The total overly acquired amount divides the amount acquired in each step. Get the ceil value so not to miss the acquire that just exceeds. + long maxFailures = (long) Math.ceil((acquireCount * acquireStep * 2 - maxSendQueueCapacity) / acquireStep); // The result must be in the range of lone AtomicLong acquisitionFailures = new AtomicLong(); - for (int i = 0; i < 100; i++) - { - executor.submit(() -> { - for (int j = 0; j < 10000; j++) - { - if (!outbound.unsafeAcquireCapacity(acquireStep)) - acquisitionFailures.incrementAndGet(); - } + Runnable acquirer = () -> { + for (int j = 0; j < attempts; j++) + { + if (!outbound.unsafeAcquireCapacity(acquireStep)) + acquisitionFailures.incrementAndGet(); + } + }; + Runnable releaser = () -> { + for (int j = 0; j < attempts; j++) + outbound.unsafeReleaseCapacity(acquireStep); + }; - }); - } + // Start N acquirer and releaser to contend for capcaity + List<Runnable> submitOrder = new ArrayList<>(concurrency * 2); + for (int i = 0 ; i < concurrency ; ++i) + submitOrder.add(acquirer); + for (int i = 0 ; i < concurrency ; ++i) + submitOrder.add(releaser); + // randomize their start order + randomize(submitOrder); - for (int i = 0; i < 100; i++) + try { - executor.submit(() -> { - for (int j = 0; j < 10000; j++) - outbound.unsafeReleaseCapacity(acquireStep); - }); + // Reserve enough capacity upfront to ensure the releaser threads cannot release all reserved capacity. + // i.e. the pendingBytes is always positive during the test. + Assert.assertTrue("Unable to reserve enough capacity", + outbound.unsafeAcquireCapacity(acquireCount, acquireCount * acquireStep)); + ExecutorService executor = Executors.newFixedThreadPool(concurrency); + + submitOrder.forEach(executor::submit); + + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + + Assert.assertEquals(acquireCount * acquireStep - (acquisitionFailures.get() * acquireStep), outbound.pendingBytes()); + Assert.assertEquals(acquireCount - acquisitionFailures.get(), outbound.pendingCount()); + Assert.assertTrue(String.format("acquisitionFailures should be capped by maxFailure. acquisitionFailures: %d, acquisitionFailures: %d", + maxFailures, acquisitionFailures.get()), + acquisitionFailures.get() <= maxFailures); + } + finally + { // release the acquired capacity from this round + outbound.unsafeReleaseCapacity(outbound.pendingCount(), outbound.pendingBytes()); } - - executor.shutdown(); - executor.awaitTermination(10, TimeUnit.SECONDS); - - // We can release more than we acquire, which certainly should not happen in - // real life, but since it's a test just for acquisition and release, it is fine - Assert.assertEquals(100 * 10000 * acquireStep - (acquisitionFailures.get() * acquireStep), outbound.pendingBytes()); }); } + private static <V> void randomize(List<V> list) + { + long seed = ThreadLocalRandom.current().nextLong(); + logger.info("Seed used for randomize: " + seed); + Random random = new Random(seed); + switch (random.nextInt(3)) + { + case 0: + Collections.shuffle(list, random); + break; + case 1: + Collections.reverse(list); + break; + case 2: + // leave as is + } + } + private void connect(OutboundConnection outbound) throws Throwable { CountDownLatch latch = new CountDownLatch(1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org