tolbertam commented on code in PR #1957:
URL:
https://github.com/apache/cassandra-java-driver/pull/1957#discussion_r1758097884
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +237,125 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently() throws
InterruptedException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall latch
countDown()ed
+ // register() should automatically start onThrottleReady on same thread
+
+ // start a parallel thread
+ CountDownLatch firstRelease = new CountDownLatch(1);
+ MockThrottled first = new MockThrottled(firstRelease);
+ Runnable r =
+ () -> {
+ throttler.register(first);
+ first.ended.toCompletableFuture().thenRun(() ->
throttler.signalSuccess(first));
+ };
+ Thread t = new Thread(r);
+ t.start();
+
+ // wait for the registration threads to reach await state
+ assertThatStage(first.started).isSuccess();
+ assertThatStage(first.ended).isNotDone();
+
+ // When
+ // we concurrently submit a second shorter task
+ MockThrottled second = new MockThrottled();
+ // (on a second thread, so that we can join and force a timeout in case
+ // registration is delayed)
+ Thread t2 = new Thread(() -> throttler.register(second));
+ t2.start();
+ t2.join(1_000);
Review Comment:
From what I can tell `join(x)` doesn't throw if the thread doesn't complete,
i suppose we could add `assertThat(t2.isAlive()).isFalse();` where
`join(1_000)` is used to ensure the thread completes.
```suggestion
t2.join(1_000);
assertThat(t2.isAlive()).isFalse();
```
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +237,125 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently() throws
InterruptedException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall latch
countDown()ed
+ // register() should automatically start onThrottleReady on same thread
+
+ // start a parallel thread
+ CountDownLatch firstRelease = new CountDownLatch(1);
+ MockThrottled first = new MockThrottled(firstRelease);
+ Runnable r =
+ () -> {
+ throttler.register(first);
+ first.ended.toCompletableFuture().thenRun(() ->
throttler.signalSuccess(first));
+ };
+ Thread t = new Thread(r);
+ t.start();
+
+ // wait for the registration threads to reach await state
+ assertThatStage(first.started).isSuccess();
+ assertThatStage(first.ended).isNotDone();
+
+ // When
+ // we concurrently submit a second shorter task
+ MockThrottled second = new MockThrottled();
+ // (on a second thread, so that we can join and force a timeout in case
+ // registration is delayed)
+ Thread t2 = new Thread(() -> throttler.register(second));
+ t2.start();
+ t2.join(1_000);
+
+ // Then
+ // registration will trigger callback, should complete ~immediately
+ assertThatStage(second.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+ // first should still be unfinished
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+ // now finish, and verify
+ firstRelease.countDown();
+ assertThatStage(first.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+
+ t.join(1_000);
Review Comment:
```suggestion
t.join(1_000);
assertThat(t.isAlive()).isFalse();
```
##########
core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java:
##########
@@ -236,7 +237,125 @@ public void should_reject_enqueued_when_closing() {
throttler.register(request);
// Then
- assertThatStage(request.started)
+ assertThatStage(request.ended)
.isFailed(error ->
assertThat(error).isInstanceOf(RequestThrottlingException.class));
}
+
+ @Test
+ public void should_run_throttle_callbacks_concurrently() throws
InterruptedException {
+ // Given
+
+ // a task is enqueued, which when in onThrottleReady, will stall latch
countDown()ed
+ // register() should automatically start onThrottleReady on same thread
+
+ // start a parallel thread
+ CountDownLatch firstRelease = new CountDownLatch(1);
+ MockThrottled first = new MockThrottled(firstRelease);
+ Runnable r =
+ () -> {
+ throttler.register(first);
+ first.ended.toCompletableFuture().thenRun(() ->
throttler.signalSuccess(first));
+ };
+ Thread t = new Thread(r);
+ t.start();
+
+ // wait for the registration threads to reach await state
+ assertThatStage(first.started).isSuccess();
+ assertThatStage(first.ended).isNotDone();
+
+ // When
+ // we concurrently submit a second shorter task
+ MockThrottled second = new MockThrottled();
+ // (on a second thread, so that we can join and force a timeout in case
+ // registration is delayed)
+ Thread t2 = new Thread(() -> throttler.register(second));
+ t2.start();
+ t2.join(1_000);
+
+ // Then
+ // registration will trigger callback, should complete ~immediately
+ assertThatStage(second.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+ // first should still be unfinished
+ assertThatStage(first.started).isDone();
+ assertThatStage(first.ended).isNotDone();
+ // now finish, and verify
+ firstRelease.countDown();
+ assertThatStage(first.ended).isSuccess(wasDelayed ->
assertThat(wasDelayed).isFalse());
+
+ t.join(1_000);
+ }
+
+ @Test
+ public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws
InterruptedException {
+ // Given
+
+ // Multiple tasks are registered, up to the limit, and proceed into their
+ // callback
+
+ // start five parallel threads
+ final int THREADS = 5;
+ Thread[] threads = new Thread[THREADS];
+ CountDownLatch[] latches = new CountDownLatch[THREADS];
+ MockThrottled[] throttled = new MockThrottled[THREADS];
+ for (int i = 0; i < threads.length; i++) {
+ latches[i] = new CountDownLatch(1);
+ final MockThrottled itThrottled = new MockThrottled(latches[i]);
+ throttled[i] = itThrottled;
+ threads[i] =
+ new Thread(
+ () -> {
+ throttler.register(itThrottled);
+ itThrottled
+ .ended
+ .toCompletableFuture()
+ .thenRun(() -> throttler.signalSuccess(itThrottled));
+ });
+ threads[i].start();
+ }
+
+ // wait for the registration threads to be launched
+ // they are all waiting now
+ for (int i = 0; i < throttled.length; i++) {
+ assertThatStage(throttled[i].started).isSuccess();
+ assertThatStage(throttled[i].ended).isNotDone();
+ }
+
+ // When
+ // we concurrently submit another task
+ MockThrottled last = new MockThrottled();
+ throttler.register(last);
+
+ // Then
+ // registration will enqueue the callback, and it should not
+ // take any time to proceed (ie: we should not be blocked)
+ // and there should be an element in the queue
+ assertThatStage(last.started).isNotDone();
+ assertThatStage(last.ended).isNotDone();
+ assertThat(throttler.getQueue()).containsExactly(last);
+
+ // we still have not released, so old throttled threads should be waiting
+ for (int i = 0; i < throttled.length; i++) {
+ assertThatStage(throttled[i].started).isDone();
+ assertThatStage(throttled[i].ended).isNotDone();
+ }
+
+ // now let us release ..
+ for (int i = 0; i < latches.length; i++) {
+ latches[i].countDown();
+ }
+
+ // .. and check everything finished up OK
+ for (int i = 0; i < latches.length; i++) {
+ assertThatStage(throttled[i].started).isSuccess();
+ assertThatStage(throttled[i].ended).isSuccess();
+ }
+
+ // for good measure, we will also wait for the enqueued to complete
+ assertThatStage(last.started).isSuccess();
+ assertThatStage(last.ended).isSuccess();
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join(1_000);
Review Comment:
```suggestion
threads[i].join(1_000);
assertThat(threads[i].isAlive()).isFalse();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]