This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f5be293745 KAFKA-18332 fix ClassDataAbstractionCoupling problem in 
KafkaRaftClientTest(1/2) (#18926)
6f5be293745 is described below

commit 6f5be293745882350c01de10cc4cce3ad8dcf27b
Author: leaf-soba <[email protected]>
AuthorDate: Thu Apr 17 22:41:37 2025 +0800

    KAFKA-18332 fix ClassDataAbstractionCoupling problem in 
KafkaRaftClientTest(1/2) (#18926)
    
    - extract a unit test named `KafkaRaftClientClusterAuthTest` to reduce
    the number of imported class
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/raft/KafkaRaftClientClusterAuthTest.java | 148 ++++++++++++++++
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 194 ++++-----------------
 2 files changed, 185 insertions(+), 157 deletions(-)

diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
new file mode 100644
index 00000000000..62c8e117698
--- /dev/null
+++ 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static 
org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class KafkaRaftClientClusterAuthTest {
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    void testClusterAuthorizationFailedInFetch(boolean withKip853Rpc) throws 
Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int epoch = 5;
+        Set<Integer> voters = Set.of(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .withKip853Rpc(withKip853Rpc)
+                .withElectedLeader(epoch, otherNodeId)
+                .build();
+
+        context.assertElectedLeader(epoch, otherNodeId);
+
+        context.pollUntilRequest();
+
+        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
0, 0);
+        FetchResponseData response = new FetchResponseData()
+                .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+        context.deliverResponse(
+                request.correlationId(),
+                request.destination(),
+                response
+        );
+        assertThrows(ClusterAuthorizationException.class, 
context.client::poll);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int epoch = 5;
+        Set<Integer> voters = Set.of(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .updateRandom(r -> r.mockNextInt(DEFAULT_ELECTION_TIMEOUT_MS, 
0))
+                .withUnknownLeader(epoch - 1)
+                .withKip853Rpc(withKip853Rpc)
+                .build();
+
+        context.time.sleep(context.electionTimeoutMs());
+        context.expectAndGrantPreVotes(epoch - 1);
+        context.expectAndGrantVotes(epoch);
+
+        context.pollUntilRequest();
+        List<RaftRequest.Outbound> requests = 
context.collectBeginEpochRequests(epoch);
+        assertEquals(1, requests.size());
+        RaftRequest.Outbound request = requests.get(0);
+        assertEquals(otherNodeId, request.destination().id());
+        BeginQuorumEpochResponseData response = new 
BeginQuorumEpochResponseData()
+                .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+
+        context.deliverResponse(request.correlationId(), 
request.destination(), response);
+        assertThrows(ClusterAuthorizationException.class, 
context.client::poll);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    void testClusterAuthorizationFailedInVote(boolean withKip853Rpc) throws 
Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int epoch = 5;
+        Set<Integer> voters = Set.of(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .withUnknownLeader(epoch - 1)
+                .withKip853Rpc(withKip853Rpc)
+                .build();
+
+        // Become a candidate
+        context.unattachedToCandidate();
+        context.pollUntilRequest();
+        context.assertVotedCandidate(epoch, localId);
+
+        RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0, 
0L, 1);
+        VoteResponseData response = new VoteResponseData()
+                .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+
+        context.deliverResponse(request.correlationId(), 
request.destination(), response);
+        assertThrows(ClusterAuthorizationException.class, 
context.client::poll);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    void testClusterAuthorizationFailedInEndQuorumEpoch(boolean withKip853Rpc) 
throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        Set<Integer> voters = Set.of(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+                .withUnknownLeader(1)
+                .withKip853Rpc(withKip853Rpc)
+                .build();
+
+        context.unattachedToLeader();
+        int epoch = context.currentEpoch();
+
+        context.client.shutdown(5000);
+        context.pollUntilRequest();
+
+        RaftRequest.Outbound request = 
context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
+        EndQuorumEpochResponseData response = new EndQuorumEpochResponseData()
+                .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+
+        context.deliverResponse(request.correlationId(), 
request.destination(), response);
+        assertThrows(ClusterAuthorizationException.class, 
context.client::poll);
+    }
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index afa5cce0521..a1e92f6afcf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -17,10 +17,8 @@
 package org.apache.kafka.raft;
 
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
@@ -63,7 +61,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS;
@@ -77,8 +74,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
-public class KafkaRaftClientTest {
+@SuppressWarnings("ClassFanOutComplexity")
+class KafkaRaftClientTest {
     @Test
     public void testNodeDirectoryId() {
         int localId = randomReplicaId();
@@ -917,10 +914,10 @@ public class KafkaRaftClientTest {
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
-        Record record = batch.iterator().next();
-        assertEquals(electionTimestamp, record.timestamp());
+        Record expectedRecord = batch.iterator().next();
+        assertEquals(electionTimestamp, expectedRecord.timestamp());
         RaftClientTestContext.verifyLeaderChangeMessage(localId, 
List.of(localId, otherNodeId),
-            List.of(otherNodeId, localId), record.key(), record.value());
+                List.of(otherNodeId, localId), expectedRecord.key(), 
expectedRecord.value());
     }
 
     @ParameterizedTest
@@ -966,10 +963,10 @@ public class KafkaRaftClientTest {
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
-        Record record = batch.iterator().next();
-        assertEquals(electionTimestamp, record.timestamp());
+        Record expectedRecord = batch.iterator().next();
+        assertEquals(electionTimestamp, expectedRecord.timestamp());
         RaftClientTestContext.verifyLeaderChangeMessage(localId, 
List.of(localId, firstNodeId, secondNodeId),
-            List.of(voterId, localId), record.key(), record.value());
+                List.of(voterId, localId), expectedRecord.key(), 
expectedRecord.value());
     }
 
     @ParameterizedTest
@@ -2098,7 +2095,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withBootstrapServers(Optional.of(bootstrapServers))
@@ -2145,7 +2142,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withBootstrapServers(Optional.of(bootstrapServers))
@@ -2188,7 +2185,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withBootstrapServers(Optional.of(bootstrapServers))
@@ -2262,7 +2259,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withBootstrapServers(Optional.of(bootstrapServers))
@@ -2471,7 +2468,7 @@ public class KafkaRaftClientTest {
         // invalid voter id is rejected
         context.deliverRequest(
             context.voteRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 epoch + 1,
                 otherNodeKey,
                 ReplicaKey.of(10, Uuid.randomUuid()),
@@ -2486,7 +2483,7 @@ public class KafkaRaftClientTest {
         // invalid voter directory id is rejected
         context.deliverRequest(
             context.voteRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 epoch + 2,
                 otherNodeKey,
                 ReplicaKey.of(0, Uuid.randomUuid()),
@@ -2516,7 +2513,7 @@ public class KafkaRaftClientTest {
         // Leader voter3 sends a begin quorum epoch request with incorrect 
voter id
         context.deliverRequest(
             context.beginEpochRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 epoch,
                 voter3,
                 ReplicaKey.of(10, Uuid.randomUuid())
@@ -2529,7 +2526,7 @@ public class KafkaRaftClientTest {
         // Leader voter3 sends a begin quorum epoch request with incorrect 
voter directory id
         context.deliverRequest(
             context.beginEpochRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 epoch,
                 voter3,
                 ReplicaKey.of(localId, Uuid.randomUuid())
@@ -2542,7 +2539,7 @@ public class KafkaRaftClientTest {
         // Leader voter3 sends a begin quorum epoch request with incorrect 
voter directory id
         context.deliverRequest(
             context.beginEpochRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 epoch,
                 voter3,
                 context.localReplicaKey()
@@ -2568,7 +2565,7 @@ public class KafkaRaftClientTest {
         int epoch = context.currentEpoch();
 
         // valid cluster id is accepted
-        
context.deliverRequest(context.beginEpochRequest(context.clusterId.toString(), 
epoch, localId));
+        context.deliverRequest(context.beginEpochRequest(context.clusterId, 
epoch, localId));
         context.pollUntilResponse();
         context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
 
@@ -2931,7 +2928,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withBootstrapServers(Optional.of(bootstrapServers))
@@ -2970,7 +2967,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withBootstrapServers(Optional.of(bootstrapServers))
@@ -3005,7 +3002,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withBootstrapServers(Optional.of(bootstrapServers))
@@ -3054,7 +3051,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withBootstrapServers(Optional.of(bootstrapServers))
@@ -3965,10 +3962,10 @@ public class KafkaRaftClientTest {
         List<Record> readRecords = Utils.toList(leaderChangeBatch.iterator());
         assertEquals(1, readRecords.size());
 
-        Record record = readRecords.get(0);
-        assertEquals(now, record.timestamp());
+        Record expectedRecord = readRecords.get(0);
+        assertEquals(now, expectedRecord.timestamp());
         RaftClientTestContext.verifyLeaderChangeMessage(localId, 
List.of(localId),
-            List.of(localId), record.key(), record.value());
+                List.of(localId), expectedRecord.key(), 
expectedRecord.value());
 
         MutableRecordBatch batch = batches.get(1);
         assertEquals(1, batch.partitionLeaderEpoch());
@@ -4027,24 +4024,17 @@ public class KafkaRaftClientTest {
             .build();
         context.pollUntil(() -> context.log.endOffset().offset() == 1L);
 
-        assertNotNull(getMetric(context.metrics, "current-state"));
-        assertNotNull(getMetric(context.metrics, "current-leader"));
-        assertNotNull(getMetric(context.metrics, "current-vote"));
-        assertNotNull(getMetric(context.metrics, "current-epoch"));
-        assertNotNull(getMetric(context.metrics, "high-watermark"));
-        assertNotNull(getMetric(context.metrics, "log-end-offset"));
-        assertNotNull(getMetric(context.metrics, "log-end-epoch"));
-        assertNotNull(getMetric(context.metrics, 
"number-unknown-voter-connections"));
-        assertNotNull(getMetric(context.metrics, "poll-idle-ratio-avg"));
-        assertNotNull(getMetric(context.metrics, "commit-latency-avg"));
-        assertNotNull(getMetric(context.metrics, "commit-latency-max"));
-        assertNotNull(getMetric(context.metrics, "election-latency-avg"));
-        assertNotNull(getMetric(context.metrics, "election-latency-max"));
-        assertNotNull(getMetric(context.metrics, "fetch-records-rate"));
-        assertNotNull(getMetric(context.metrics, "append-records-rate"));
-        assertNotNull(getMetric(context.metrics, "number-of-voters"));
-        assertNotNull(getMetric(context.metrics, "number-of-observers"));
-        assertNotNull(getMetric(context.metrics, "uncommitted-voter-change"));
+        var metricNames = Set.of(
+                "current-state", "current-leader", "current-vote", 
"current-epoch", "high-watermark",
+                "log-end-offset", "log-end-epoch", 
"number-unknown-voter-connections", "poll-idle-ratio-avg",
+                "commit-latency-avg", "commit-latency-max", 
"election-latency-avg", "election-latency-max",
+                "fetch-records-rate", "append-records-rate", 
"number-of-voters", "number-of-observers",
+                "uncommitted-voter-change"
+        );
+
+        for (String metricName : metricNames) {
+            assertNotNull(getMetric(context.metrics, metricName));
+        }
 
         assertEquals("leader", getMetric(context.metrics, 
"current-state").metricValue());
         assertEquals((double) localId, getMetric(context.metrics, 
"current-leader").metricValue());
@@ -4068,116 +4058,6 @@ public class KafkaRaftClientTest {
         assertEquals(1, context.metrics.metrics().size());
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = { true, false })
-    public void testClusterAuthorizationFailedInFetch(boolean withKip853Rpc) 
throws Exception {
-        int localId = randomReplicaId();
-        int otherNodeId = localId + 1;
-        int epoch = 5;
-        Set<Integer> voters = Set.of(localId, otherNodeId);
-
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
-            .withKip853Rpc(withKip853Rpc)
-            .withElectedLeader(epoch, otherNodeId)
-            .build();
-
-        context.assertElectedLeader(epoch, otherNodeId);
-
-        context.pollUntilRequest();
-
-        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
0, 0);
-        FetchResponseData response = new FetchResponseData()
-            .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
-        context.deliverResponse(
-            request.correlationId(),
-            request.destination(),
-            response
-        );
-        assertThrows(ClusterAuthorizationException.class, 
context.client::poll);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = { true, false })
-    public void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = randomReplicaId();
-        int otherNodeId = localId + 1;
-        int epoch = 5;
-        Set<Integer> voters = Set.of(localId, otherNodeId);
-
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
-            .updateRandom(r -> r.mockNextInt(DEFAULT_ELECTION_TIMEOUT_MS, 0))
-            .withUnknownLeader(epoch - 1)
-            .withKip853Rpc(withKip853Rpc)
-            .build();
-
-        context.time.sleep(context.electionTimeoutMs());
-        context.expectAndGrantPreVotes(epoch - 1);
-        context.expectAndGrantVotes(epoch);
-
-        context.pollUntilRequest();
-        List<RaftRequest.Outbound> requests = 
context.collectBeginEpochRequests(epoch);
-        assertEquals(1, requests.size());
-        RaftRequest.Outbound request = requests.get(0);
-        assertEquals(otherNodeId, request.destination().id());
-        BeginQuorumEpochResponseData response = new 
BeginQuorumEpochResponseData()
-            .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
-
-        context.deliverResponse(request.correlationId(), 
request.destination(), response);
-        assertThrows(ClusterAuthorizationException.class, 
context.client::poll);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = { true, false })
-    public void testClusterAuthorizationFailedInVote(boolean withKip853Rpc) 
throws Exception {
-        int localId = randomReplicaId();
-        int otherNodeId = localId + 1;
-        int epoch = 5;
-        Set<Integer> voters = Set.of(localId, otherNodeId);
-
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
-            .withUnknownLeader(epoch - 1)
-            .withKip853Rpc(withKip853Rpc)
-            .build();
-
-        // Become a candidate
-        context.unattachedToCandidate();
-        context.pollUntilRequest();
-        context.assertVotedCandidate(epoch, localId);
-
-        RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0, 
0L, 1);
-        VoteResponseData response = new VoteResponseData()
-            .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
-
-        context.deliverResponse(request.correlationId(), 
request.destination(), response);
-        assertThrows(ClusterAuthorizationException.class, 
context.client::poll);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = { true, false })
-    public void testClusterAuthorizationFailedInEndQuorumEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = randomReplicaId();
-        int otherNodeId = localId + 1;
-        Set<Integer> voters = Set.of(localId, otherNodeId);
-
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
-            .withUnknownLeader(1)
-            .withKip853Rpc(withKip853Rpc)
-            .build();
-
-        context.unattachedToLeader();
-        int epoch = context.currentEpoch();
-
-        context.client.shutdown(5000);
-        context.pollUntilRequest();
-
-        RaftRequest.Outbound request = 
context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
-        EndQuorumEpochResponseData response = new EndQuorumEpochResponseData()
-            .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
-
-        context.deliverResponse(request.correlationId(), 
request.destination(), response);
-        assertThrows(ClusterAuthorizationException.class, 
context.client::poll);
-    }
-
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void 
testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffsetOnEmptyLog(
@@ -4646,7 +4526,7 @@ public class KafkaRaftClientTest {
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
-            .collect(Collectors.toList());
+            .toList();
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(OptionalInt.empty(), voters)
             .withBootstrapServers(Optional.of(bootstrapServers))

Reply via email to