driftx commented on code in PR #4624:
URL: https://github.com/apache/cassandra/pull/4624#discussion_r2824390665
##########
test/unit/org/apache/cassandra/hints/HintsServiceTest.java:
##########
@@ -158,73 +159,78 @@ public void testPauseAndResume() throws
InterruptedException, ExecutionException
HintsService.instance.pauseDispatch();
// create spy for hint messages
- MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1);
-
- // we should not send any hints while paused
- ListenableFuture<Boolean> noMessagesWhilePaused =
spy.interceptNoMsg(15, TimeUnit.SECONDS);
- Futures.addCallback(noMessagesWhilePaused, new
MoreFutures.SuccessCallback<Boolean>()
+ try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 100, -1))
{
- public void onSuccess(@Nullable Boolean aBoolean)
+
+ // we should not send any hints while paused
+ ListenableFuture<Boolean> noMessagesWhilePaused =
spy.interceptNoMsg(15, TimeUnit.SECONDS);
+ Futures.addCallback(noMessagesWhilePaused, new
MoreFutures.SuccessCallback<Boolean>()
{
- HintsService.instance.resumeDispatch();
- }
- }, MoreExecutors.directExecutor());
-
- Futures.allAsList(
- noMessagesWhilePaused,
- spy.interceptMessageOut(100),
- spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
- ).get();
+ public void onSuccess(@Nullable Boolean aBoolean)
+ {
+ HintsService.instance.resumeDispatch();
+ }
+ }, MoreExecutors.directExecutor());
+
+ Futures.allAsList(
+ noMessagesWhilePaused,
+ spy.interceptMessageOut(100),
+ spy.interceptNoMsg(200, TimeUnit.MILLISECONDS)
+ ).get();
+ }
}
@Test
public void testPageRetry() throws InterruptedException,
ExecutionException, TimeoutException
{
// create spy for hint messages, but only create responses for 5 hints
- MockMessagingSpy spy = sendHintsAndResponses(metadata, 20, 5);
+ try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 20, 5))
+ {
- Futures.allAsList(
- // the dispatcher will always send all hints within the
current page
- // and only wait for the acks before going to the next page
- spy.interceptMessageOut(20),
- spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
+ Futures.allAsList(
+ // the dispatcher will always send all hints within the current
page
+ // and only wait for the acks before going to the next page
+ spy.interceptMessageOut(20),
+ spy.interceptNoMsg(200, TimeUnit.MILLISECONDS),
- // next tick will trigger a retry of the same page as we only
replied with 5/20 acks
- spy.interceptMessageOut(20)
- ).get();
+ // next tick will trigger a retry of the same page as we only
replied with 5/20 acks
+ spy.interceptMessageOut(20)
+ ).get();
- // marking the destination node as dead should stop sending hints
- failureDetector.isAlive = false;
- spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
+ // marking the destination node as dead should stop sending hints
+ failureDetector.isAlive = false;
+ spy.interceptNoMsg(20, TimeUnit.SECONDS).get();
+ }
}
@Test
public void testPageSeek() throws InterruptedException, ExecutionException
{
// create spy for hint messages, stop replying after 12k (should be on
3rd page)
- MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000, 12000);
-
- // At this point the dispatcher will constantly retry the page we
stopped acking,
- // thus we receive the same hints from the page multiple times and in
total more than
- // all written hints. Lets just consume them for a while and then
pause the dispatcher.
- spy.interceptMessageOut(22000).get();
- HintsService.instance.pauseDispatch();
- Thread.sleep(1000);
-
- // verify that we have a dispatch offset set for the page we're
currently stuck at
- HintsStore store =
HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
- AtomicReference<HintsDescriptor> hintDescriptorRef = new
AtomicReference<>();
- Util.spinUntilTrue(() -> {
- HintsDescriptor descriptor = store.poll();
- if (descriptor != null)
- hintDescriptorRef.set(descriptor);
- return descriptor != null;
- }, 20, SECONDS);
- HintsDescriptor descriptor = hintDescriptorRef.get();
- store.offerFirst(descriptor); // add again for cleanup during
re-instanciation
- InputPosition dispatchOffset = store.getDispatchOffset(descriptor);
- assertTrue(dispatchOffset != null);
- assertTrue(((ChecksummedDataInput.Position)
dispatchOffset).sourcePosition > 0);
+ try(MockMessagingSpy spy = sendHintsAndResponses(metadata, 20000,
12000))
+ {
+ // At this point the dispatcher will constantly retry the page we
stopped acking,
+ // thus we receive the same hints from the page multiple times and
in total more than
+ // all written hints. Lets just consume them for a while and then
pause the dispatcher.
+ spy.interceptMessageOut(22000).get();
+ HintsService.instance.pauseDispatch();
+ Thread.sleep(1000);
+
+ // verify that we have a dispatch offset set for the page we're
currently stuck at
+ HintsStore store =
HintsService.instance.getCatalog().get(StorageService.instance.getLocalHostUUID());
+ AtomicReference<HintsDescriptor> hintDescriptorRef = new
AtomicReference<>();
+ Awaitility.waitAtMost(20, SECONDS).until(() -> {
+ HintsDescriptor descriptor = store.poll();
Review Comment:
maybe we want peek() instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]