tolbertam commented on code in PR #1957:
URL:
https://github.com/apache/cassandra-java-driver/pull/1957#discussion_r1755762998
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +239,116 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall for 100ms
Review Comment:
since it no longer sleeps, should change to "will stall until latch
countdown"
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +239,116 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently()
+ throws ExecutionException, InterruptedException, TimeoutException {
Review Comment:
small nit: `ExecutionException` and `TimeoutException` are no longer thrown
by code in this method, so those can be removed from the signature
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +239,116 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall for 100ms
+ // register() should automatically start onThrottleReady on same thread
+
+ // start a parallel thread
+ CountDownLatch firstRelease = new CountDownLatch(1);
+ MockThrottled first = new MockThrottled(firstRelease);
+ Runnable r =
+ () -> {
+ throttler.register(first);
+ first.ended.toCompletableFuture().thenRun(() ->
throttler.signalSuccess(first));
+ };
+ Thread t = new Thread(r);
+ t.start();
+
+ // wait for the registration threads to reach await state
+ first.started.toCompletableFuture().join();
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+
+ // When
+ // we concurrently submit a second shorter task
+ MockThrottled second = new MockThrottled();
+ throttler.register(second);
+
+ // Then
+ // registration will trigger callback, should complete ~immediately
+ assertThatStage(second.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+ // first should still be unfinished
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+ // now finish, and verify
+ firstRelease.countDown();
+ assertThatStage(first.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+
+ t.join();
+ }
+
+ @Test
+ public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws
InterruptedException {
+ // Given
+
+ // Multiple tasks are registered, up to the limit, and proceed into their
+ // callback
+
+ // start five parallel threads
+ Thread[] threads = new Thread[5];
+ CountDownLatch[] latches = new CountDownLatch[5];
+ MockThrottled[] throttled = new MockThrottled[5];
+ for (int i = 0; i < threads.length; i++) {
+ latches[i] = new CountDownLatch(1);
+ final MockThrottled itThrottled = new MockThrottled(latches[i]);
+ throttled[i] = itThrottled;
+ Thread t =
+ new Thread(
+ () -> {
+ throttler.register(itThrottled);
+ itThrottled
+ .ended
+ .toCompletableFuture()
+ .thenRun(() -> throttler.signalSuccess(itThrottled));
+ });
+ threads[i] = t;
+ t.start();
+ }
+
+ // wait for the registration threads to be launched
+ // they are all waiting now
+ for (int i = 0; i < throttled.length; i++) {
+ throttled[i].started.toCompletableFuture().join();
+ assertThatStage(throttled[i].started).isDone();
+ assertThatStage(throttled[i].ended).isNotDone();
+ }
+
+ // When
+ // we concurrently submit another task
+ MockThrottled last = new MockThrottled();
+ throttler.register(last);
+
+ // Then
+ // registration will enqueue the callback, and it should not
+ // take any time to proceed (ie: we should not be blocked)
+ // and there should be an element in the queue
+ assertThatStage(last.started).isNotDone();
+ assertThatStage(last.ended).isNotDone();
+ assertThat(throttler.getQueue()).containsExactly(last);
+
+ // we still have not released, so old throttled threads should be waiting
+ for (int i = 0; i < throttled.length; i++) {
+ assertThatStage(throttled[i].started).isDone();
+ assertThatStage(throttled[i].ended).isNotDone();
+ }
+
+ // now let us release ..
+ for (int i = 0; i < latches.length; i++) {
+ latches[i].countDown();
+ }
+
+ // .. and check everything finished up OK
+ for (int i = 0; i < latches.length; i++) {
+ assertThatStage(throttled[i].started).isDone();
+ assertThatStage(throttled[i].ended).isSuccess();
+ }
Review Comment:
Would also be good to ensure that last completes as well:
```
assertThatStage(last.started).isSuccess();
assertThatStage(last.ended).isSuccess();
```
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +239,116 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall for 100ms
+ // register() should automatically start onThrottleReady on same thread
+
+ // start a parallel thread
+ CountDownLatch firstRelease = new CountDownLatch(1);
+ MockThrottled first = new MockThrottled(firstRelease);
+ Runnable r =
+ () -> {
+ throttler.register(first);
+ first.ended.toCompletableFuture().thenRun(() ->
throttler.signalSuccess(first));
+ };
+ Thread t = new Thread(r);
+ t.start();
+
+ // wait for the registration threads to reach await state
+ first.started.toCompletableFuture().join();
+ assertThatStage(first.started).isDone();
Review Comment:
One possible issue with using `join()` is that it will block until the
future completes. I think if everything works as it should, this is fine, but
if for whatever reason a bug is introduced it may just hang tests.
Was looking around and i saw that `CompletionStateAssert.isSuccess()` will
actually do `get(2, TimeUnit.SECONDS)`, so maybe we can just change these two
lines to:
```suggestion
assertThatStage(first.started).isSuccess();
```
We could do then do this anywhere else that `join` is used
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +239,116 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall for 100ms
+ // register() should automatically start onThrottleReady on same thread
+
+ // start a parallel thread
+ CountDownLatch firstRelease = new CountDownLatch(1);
+ MockThrottled first = new MockThrottled(firstRelease);
+ Runnable r =
+ () -> {
+ throttler.register(first);
+ first.ended.toCompletableFuture().thenRun(() ->
throttler.signalSuccess(first));
+ };
+ Thread t = new Thread(r);
+ t.start();
+
+ // wait for the registration threads to reach await state
+ first.started.toCompletableFuture().join();
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+
+ // When
+ // we concurrently submit a second shorter task
+ MockThrottled second = new MockThrottled();
+ throttler.register(second);
+
+ // Then
+ // registration will trigger callback, should complete ~immediately
+ assertThatStage(second.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+ // first should still be unfinished
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+ // now finish, and verify
+ firstRelease.countDown();
+ assertThatStage(first.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+
+ t.join();
+ }
+
+ @Test
+ public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws
InterruptedException {
+ // Given
+
+ // Multiple tasks are registered, up to the limit, and proceed into their
+ // callback
+
+ // start five parallel threads
+ Thread[] threads = new Thread[5];
Review Comment:
`threads` never used now so can probably get rid of this array.
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +239,116 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall for 100ms
+ // register() should automatically start onThrottleReady on same thread
+
+ // start a parallel thread
+ CountDownLatch firstRelease = new CountDownLatch(1);
+ MockThrottled first = new MockThrottled(firstRelease);
+ Runnable r =
+ () -> {
+ throttler.register(first);
+ first.ended.toCompletableFuture().thenRun(() ->
throttler.signalSuccess(first));
+ };
+ Thread t = new Thread(r);
+ t.start();
+
+ // wait for the registration threads to reach await state
+ first.started.toCompletableFuture().join();
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+
+ // When
+ // we concurrently submit a second shorter task
+ MockThrottled second = new MockThrottled();
+ throttler.register(second);
+
+ // Then
+ // registration will trigger callback, should complete ~immediately
+ assertThatStage(second.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+ // first should still be unfinished
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+ // now finish, and verify
+ firstRelease.countDown();
+ assertThatStage(first.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+
+ t.join();
+ }
+
+ @Test
+ public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws
InterruptedException {
+ // Given
+
+ // Multiple tasks are registered, up to the limit, and proceed into their
+ // callback
+
+ // start five parallel threads
+ Thread[] threads = new Thread[5];
+ CountDownLatch[] latches = new CountDownLatch[5];
+ MockThrottled[] throttled = new MockThrottled[5];
+ for (int i = 0; i < threads.length; i++) {
+ latches[i] = new CountDownLatch(1);
+ final MockThrottled itThrottled = new MockThrottled(latches[i]);
+ throttled[i] = itThrottled;
+ Thread t =
+ new Thread(
+ () -> {
+ throttler.register(itThrottled);
+ itThrottled
+ .ended
+ .toCompletableFuture()
+ .thenRun(() -> throttler.signalSuccess(itThrottled));
+ });
+ threads[i] = t;
+ t.start();
+ }
+
+ // wait for the registration threads to be launched
+ // they are all waiting now
+ for (int i = 0; i < throttled.length; i++) {
+ throttled[i].started.toCompletableFuture().join();
+ assertThatStage(throttled[i].started).isDone();
+ assertThatStage(throttled[i].ended).isNotDone();
+ }
+
+ // When
+ // we concurrently submit another task
+ MockThrottled last = new MockThrottled();
+ throttler.register(last);
+
+ // Then
+ // registration will enqueue the callback, and it should not
+ // take any time to proceed (ie: we should not be blocked)
+ // and there should be an element in the queue
+ assertThatStage(last.started).isNotDone();
+ assertThatStage(last.ended).isNotDone();
+ assertThat(throttler.getQueue()).containsExactly(last);
+
+ // we still have not released, so old throttled threads should be waiting
+ for (int i = 0; i < throttled.length; i++) {
+ assertThatStage(throttled[i].started).isDone();
+ assertThatStage(throttled[i].ended).isNotDone();
+ }
+
+ // now let us release ..
+ for (int i = 0; i < latches.length; i++) {
+ latches[i].countDown();
+ }
+
+ // .. and check everything finished up OK
+ for (int i = 0; i < latches.length; i++) {
+ assertThatStage(throttled[i].started).isDone();
Review Comment:
It doesn't seem to be a problem, but just to be sure can we also replace
with `assertThatStage(throttled[i].started).isSuccess()` so it will block until
it completes? I suspect that it's probably the case that the future will be
completed by time it gets here but wasn't certain so figured would be good to
add that just in case.
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +239,116 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently()
+ throws ExecutionException, InterruptedException, TimeoutException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall for 100ms
+ // register() should automatically start onThrottleReady on same thread
+
+ // start a parallel thread
+ CountDownLatch firstRelease = new CountDownLatch(1);
+ MockThrottled first = new MockThrottled(firstRelease);
+ Runnable r =
+ () -> {
+ throttler.register(first);
+ first.ended.toCompletableFuture().thenRun(() ->
throttler.signalSuccess(first));
+ };
+ Thread t = new Thread(r);
+ t.start();
+
+ // wait for the registration threads to reach await state
+ first.started.toCompletableFuture().join();
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+
+ // When
+ // we concurrently submit a second shorter task
+ MockThrottled second = new MockThrottled();
+ throttler.register(second);
+
+ // Then
+ // registration will trigger callback, should complete ~immediately
+ assertThatStage(second.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+ // first should still be unfinished
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+ // now finish, and verify
+ firstRelease.countDown();
+ assertThatStage(first.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+
+ t.join();
+ }
+
+ @Test
+ public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws
InterruptedException {
Review Comment:
InterruptedException no longer thrown so can be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]