smjn commented on code in PR #18824:
URL: https://github.com/apache/kafka/pull/18824#discussion_r1946411223
##########
server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java:
##########
@@ -324,6 +345,152 @@ protected AbstractRequest.Builder<? extends
AbstractRequest> requestBuilder() {
}
}
+ @Test
+ public void testFindCoordinatorNullResponse() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node suppliedNode = new Node(0, HOST, PORT);
+
+ String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, partition);
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
+ && ((FindCoordinatorRequest) body).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.SHARE.id()
+ && ((FindCoordinatorRequest)
body).data().coordinatorKeys().get(0).equals(coordinatorKey),
+ null,
+ suppliedNode
+ );
+
+ ShareCoordinatorMetadataCacheHelper cacheHelper =
getDefaultCacheHelper(suppliedNode);
+
+ PersisterStateManager stateManager =
PersisterStateManagerBuilder.builder()
+ .withKafkaClient(client)
+ .withTimer(mockTimer)
+ .withCacheHelper(cacheHelper)
+ .build();
+
+ stateManager.start();
+
+ CompletableFuture<TestStateHandler.TestHandlerResponse> future = new
CompletableFuture<>();
+
+ TestStateHandler handler = spy(new TestStateHandler(
+ stateManager,
+ groupId,
+ topicId,
+ partition,
+ future,
+ REQUEST_BACKOFF_MS,
+ REQUEST_BACKOFF_MAX_MS,
+ MAX_RPC_RETRY_ATTEMPTS
+ ) {
+ @Override
+ protected AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder() {
+ return null;
+ }
+ });
+
+ stateManager.enqueue(handler);
+
+ TestStateHandler.TestHandlerResponse result = null;
+ try {
+ result = handler.result().get();
+ } catch (Exception e) {
+ fail("Failed to get result from future", e);
+ }
+
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(),
result.data().results().get(0).partitions().get(0).errorCode());
+ verify(handler, times(1)).findShareCoordinatorBuilder();
+
+ try {
+ // Stopping the state manager
+ stateManager.stop();
+ } catch (Exception e) {
+ fail("Failed to stop state manager", e);
+ }
+ }
+
+ @Test
+ public void testFindCoordinatorDisconnect() {
+ MockClient client = new MockClient(MOCK_TIME);
+
+ String groupId = "group1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 10;
+
+ Node suppliedNode = new Node(0, HOST, PORT);
+
+ String coordinatorKey = SharePartitionKey.asCoordinatorKey(groupId,
topicId, partition);
+
+ client.prepareResponseFrom(body -> body instanceof
FindCoordinatorRequest
Review Comment:
setUnreachable disconnects in the body and the response goes empty.
Actually the `prepareResponseFrom` is not supposed to be there. Will rectify.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]