yashmayya commented on code in PR #14102:
URL: https://github.com/apache/kafka/pull/14102#discussion_r1288239604
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3724,62 +2818,54 @@ public void
testExternalZombieFencingRequestDelayedCompletion() throws Exception
pendingFencing,
tasksPerConnector
);
- tasksPerConnector.keySet().forEach(c ->
expectConfigRefreshAndSnapshot(configState));
+ expectConfigRefreshAndSnapshot(configState);
+
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+ doNothing().when(member).poll(anyLong());
// The callbacks that the herder has accrued for outstanding fencing
futures, which will be completed after
// a successful round of fencing and a task record write to the config
topic
- Map<String, Capture<KafkaFuture.BiConsumer<Void, Throwable>>>
herderFencingCallbacks = new HashMap<>();
+ Map<String, ArgumentCaptor<KafkaFuture.BiConsumer<Void, Throwable>>>
herderFencingCallbacks = new HashMap<>();
// The callbacks that the herder has installed for after a successful
round of zombie fencing, but before writing
// a task record to the config topic
- Map<String, Capture<KafkaFuture.BaseFunction<Void, Void>>>
workerFencingFollowups = new HashMap<>();
+ Map<String, ArgumentCaptor<KafkaFuture.BaseFunction<Void, Void>>>
workerFencingFollowups = new HashMap<>();
Map<String, CountDownLatch> callbacksInstalled = new HashMap<>();
tasksPerConnector.forEach((connector, numStackedRequests) -> {
// The future returned by Worker::fenceZombies
- KafkaFuture<Void> workerFencingFuture =
EasyMock.mock(KafkaFuture.class);
- // The future tracked by the herder (which tracks the fencing
performed by the worker and the possible followup write to the config topic)
- KafkaFuture<Void> herderFencingFuture =
EasyMock.mock(KafkaFuture.class);
+ KafkaFuture<Void> workerFencingFuture = mock(KafkaFuture.class);
+ // The future tracked by the herder (which tracks the fencing
performed by the worker and the possible followup write to the config topic)
+ KafkaFuture<Void> herderFencingFuture = mock(KafkaFuture.class);
- Capture<KafkaFuture.BiConsumer<Void, Throwable>>
herderFencingCallback = EasyMock.newCapture(CaptureType.ALL);
+ ArgumentCaptor<KafkaFuture.BiConsumer<Void, Throwable>>
herderFencingCallback = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class);
herderFencingCallbacks.put(connector, herderFencingCallback);
// Don't immediately invoke callbacks that the herder sets up for
when the worker fencing and writes to the config topic have completed
// Instead, wait for them to be installed, then invoke them
explicitly after the fact on a thread separate from the herder's tick thread
-
EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback)))
- .andReturn(null)
- .times(numStackedRequests + 1);
+
when(herderFencingFuture.whenComplete(herderFencingCallback.capture())).thenReturn(null);
- Capture<KafkaFuture.BaseFunction<Void, Void>> fencingFollowup =
EasyMock.newCapture();
+ ArgumentCaptor<KafkaFuture.BaseFunction<Void, Void>>
fencingFollowup = ArgumentCaptor.forClass(KafkaFuture.BaseFunction.class);
CountDownLatch callbackInstalled = new CountDownLatch(1);
workerFencingFollowups.put(connector, fencingFollowup);
callbacksInstalled.put(connector, callbackInstalled);
-
EasyMock.expect(workerFencingFuture.thenApply(EasyMock.capture(fencingFollowup))).andAnswer(()
-> {
+
when(workerFencingFuture.thenApply(fencingFollowup.capture())).thenAnswer(invocation
-> {
callbackInstalled.countDown();
return herderFencingFuture;
});
// We should only perform a single physical zombie fencing; all
the subsequent requests should be stacked onto the first one
- EasyMock.expect(worker.fenceZombies(
- EasyMock.eq(connector),
EasyMock.eq(taskCountRecords.get(connector)), EasyMock.anyObject())
- ).andReturn(workerFencingFuture);
-
- for (int i = 0; i < numStackedRequests; i++) {
- expectConfigRefreshAndSnapshot(configState);
- }
-
- PowerMock.replay(workerFencingFuture, herderFencingFuture);
+ when(worker.fenceZombies(eq(connector),
eq(taskCountRecords.get(connector)), any()))
+ .thenReturn(workerFencingFuture)
+ .thenAnswer(invocation -> {
+ fail("Expected only a single zombie fencing per
connector");
Review Comment:
Oh, that's a good point, it would not be bubbled up correctly in fact.
Edit: I guess it'd actually depend on where the extra calls are being made
from but either way the explicit verification is better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]