This is an automated email from the ASF dual-hosted git repository. blerer pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new e08053b Fix race-conditions in ConnectionTesti e08053b is described below commit e08053b77cac4ec91fd398d7bad65bba1394f45f Author: yifan-c <yc25c...@gmail.com> AuthorDate: Fri Mar 13 11:30:43 2020 -0700 Fix race-conditions in ConnectionTesti patch by Yifan Cai; reviewed by Benjamin Lerer for CASSANDRA-15630 --- test/unit/org/apache/cassandra/Util.java | 22 +++++--- .../org/apache/cassandra/net/ConnectionUtils.java | 65 ++++++++++------------ 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 3dcaff7..c989407 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -41,6 +42,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import afu.org.checkerframework.checker.oigj.qual.O; import org.apache.cassandra.db.compaction.ActiveCompactionsTracker; import org.apache.cassandra.db.compaction.CompactionTasks; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -579,18 +581,24 @@ public class Util } } - public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds) + public static void spinAssertEquals(Object expected, Supplier<Object> actualSupplier, int timeoutInSeconds) { - long start = System.currentTimeMillis(); - Object lastValue = null; - while (System.currentTimeMillis() < start + (1000 * timeoutInSeconds)) + spinAssertEquals(null, expected, actualSupplier, timeoutInSeconds, TimeUnit.SECONDS); + } + + public static <T> void spinAssertEquals(String message, T expected, Supplier<? extends T> actualSupplier, long timeout, TimeUnit timeUnit) + { + long startNano = System.nanoTime(); + long expireAtNano = startNano + timeUnit.toNanos(timeout); + T actual = null; + while (System.nanoTime() < expireAtNano) { - lastValue = s.get(); - if (lastValue.equals(expected)) + actual = actualSupplier.get(); + if (actual.equals(expected)) break; Thread.yield(); } - assertEquals(expected, lastValue); + assertEquals(message, expected, actual); } public static void joinThread(Thread thread) throws InterruptedException diff --git a/test/unit/org/apache/cassandra/net/ConnectionUtils.java b/test/unit/org/apache/cassandra/net/ConnectionUtils.java index e391785..5aff390 100644 --- a/test/unit/org/apache/cassandra/net/ConnectionUtils.java +++ b/test/unit/org/apache/cassandra/net/ConnectionUtils.java @@ -18,19 +18,17 @@ package org.apache.cassandra.net; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; -import com.google.common.util.concurrent.Uninterruptibles; -import org.junit.Assert; - -import org.apache.cassandra.net.InboundMessageHandlers; -import org.apache.cassandra.net.OutboundConnection; +import static org.apache.cassandra.Util.spinAssertEquals; public class ConnectionUtils { public interface FailCheck { - public void accept(String message, long expected, long actual); + public void accept(String message, Long expected, Supplier<Long> actualSupplier); } public static class OutboundCountChecker @@ -98,44 +96,44 @@ public class ConnectionUtils public void check() { - doCheck(Assert::assertEquals); + doCheck((message, expected, actual) -> spinAssertEquals(message, expected, actual, 5, TimeUnit.SECONDS)); } public void check(FailCheck failCheck) { - doCheck((message, expect, actual) -> { if (expect != actual) failCheck.accept(message, expect, actual); }); + doCheck((message, expect, actual) -> { if (!Objects.equals(expect, actual.get())) failCheck.accept(message, expect, actual); }); } private void doCheck(FailCheck testAndFailCheck) { if (checkSubmitted) { - testAndFailCheck.accept("submitted count values don't match", submitted, connection.submittedCount()); + testAndFailCheck.accept("submitted count values don't match", submitted, connection::submittedCount); } if (checkPending) { - testAndFailCheck.accept("pending count values don't match", pending, connection.pendingCount()); - testAndFailCheck.accept("pending bytes values don't match", pendingBytes, connection.pendingBytes()); + testAndFailCheck.accept("pending count values don't match", pending, () -> (long) connection.pendingCount()); + testAndFailCheck.accept("pending bytes values don't match", pendingBytes, connection::pendingBytes); } if (checkSent) { - testAndFailCheck.accept("sent count values don't match", sent, connection.sentCount()); - testAndFailCheck.accept("sent bytes values don't match", sentBytes, connection.sentBytes()); + testAndFailCheck.accept("sent count values don't match", sent, connection::sentCount); + testAndFailCheck.accept("sent bytes values don't match", sentBytes, connection::sentBytes); } if (checkOverload) { - testAndFailCheck.accept("overload count values don't match", overload, connection.overloadedCount()); - testAndFailCheck.accept("overload bytes values don't match", overloadBytes, connection.overloadedBytes()); + testAndFailCheck.accept("overload count values don't match", overload, connection::overloadedCount); + testAndFailCheck.accept("overload bytes values don't match", overloadBytes, connection::overloadedBytes); } if (checkExpired) { - testAndFailCheck.accept("expired count values don't match", expired, connection.expiredCount()); - testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection.expiredBytes()); + testAndFailCheck.accept("expired count values don't match", expired, connection::expiredCount); + testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection::expiredBytes); } if (checkError) { - testAndFailCheck.accept("error count values don't match", error, connection.errorCount()); - testAndFailCheck.accept("error bytes values don't match", errorBytes, connection.errorBytes()); + testAndFailCheck.accept("error count values don't match", error, connection::errorCount); + testAndFailCheck.accept("error bytes values don't match", errorBytes, connection::errorBytes); } } } @@ -197,45 +195,40 @@ public class ConnectionUtils public void check() { - doCheck(Assert::assertEquals); + doCheck((message, expected, actual) -> spinAssertEquals(message, expected, actual, 5, TimeUnit.SECONDS)); } public void check(FailCheck failCheck) { - doCheck((message, expect, actual) -> { if (expect != actual) failCheck.accept(message, expect, actual); }); + doCheck((message, expect, actual) -> { if (!Objects.equals(expect, actual.get())) failCheck.accept(message, expect, actual); }); } private void doCheck(FailCheck testAndFailCheck) { if (checkReceived) { - testAndFailCheck.accept("received count values don't match", received, connection.receivedCount()); - testAndFailCheck.accept("received bytes values don't match", receivedBytes, connection.receivedBytes()); + testAndFailCheck.accept("received count values don't match", received, connection::receivedCount); + testAndFailCheck.accept("received bytes values don't match", receivedBytes, connection::receivedBytes); } if (checkProcessed) { - testAndFailCheck.accept("processed count values don't match", processed, connection.processedCount()); - testAndFailCheck.accept("processed bytes values don't match", processedBytes, connection.processedBytes()); + testAndFailCheck.accept("processed count values don't match", processed, connection::processedCount); + testAndFailCheck.accept("processed bytes values don't match", processedBytes, connection::processedBytes); } if (checkExpired) { - testAndFailCheck.accept("expired count values don't match", expired, connection.expiredCount()); - testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection.expiredBytes()); + testAndFailCheck.accept("expired count values don't match", expired, connection::expiredCount); + testAndFailCheck.accept("expired bytes values don't match", expiredBytes, connection::expiredBytes); } if (checkError) { - testAndFailCheck.accept("error count values don't match", error, connection.errorCount()); - testAndFailCheck.accept("error bytes values don't match", errorBytes, connection.errorBytes()); + testAndFailCheck.accept("error count values don't match", error, connection::errorCount); + testAndFailCheck.accept("error bytes values don't match", errorBytes, connection::errorBytes); } if (checkScheduled) { - // scheduled cannot relied upon to not race with completion of the task, - // so if it is currently above the value we expect, sleep for a bit - if (scheduled < connection.scheduledCount()) - for (int i = 0; i < 10 && scheduled < connection.scheduledCount() ; ++i) - Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MILLISECONDS); - testAndFailCheck.accept("scheduled count values don't match", scheduled, connection.scheduledCount()); - testAndFailCheck.accept("scheduled bytes values don't match", scheduledBytes, connection.scheduledBytes()); + testAndFailCheck.accept("scheduled count values don't match", scheduled, connection::scheduledCount); + testAndFailCheck.accept("scheduled bytes values don't match", scheduledBytes, connection::scheduledBytes); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org