This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6f5be293745 KAFKA-18332 fix ClassDataAbstractionCoupling problem in
KafkaRaftClientTest(1/2) (#18926)
6f5be293745 is described below
commit 6f5be293745882350c01de10cc4cce3ad8dcf27b
Author: leaf-soba <[email protected]>
AuthorDate: Thu Apr 17 22:41:37 2025 +0800
KAFKA-18332 fix ClassDataAbstractionCoupling problem in
KafkaRaftClientTest(1/2) (#18926)
- extract a unit test named `KafkaRaftClientClusterAuthTest` to reduce
the number of imported class
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/raft/KafkaRaftClientClusterAuthTest.java | 148 ++++++++++++++++
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 194 ++++-----------------
2 files changed, 185 insertions(+), 157 deletions(-)
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
new file mode 100644
index 00000000000..62c8e117698
--- /dev/null
+++
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientClusterAuthTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static
org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class KafkaRaftClientClusterAuthTest {
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ void testClusterAuthorizationFailedInFetch(boolean withKip853Rpc) throws
Exception {
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
+ int epoch = 5;
+ Set<Integer> voters = Set.of(localId, otherNodeId);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withKip853Rpc(withKip853Rpc)
+ .withElectedLeader(epoch, otherNodeId)
+ .build();
+
+ context.assertElectedLeader(epoch, otherNodeId);
+
+ context.pollUntilRequest();
+
+ RaftRequest.Outbound request = context.assertSentFetchRequest(epoch,
0, 0);
+ FetchResponseData response = new FetchResponseData()
+ .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+ context.deliverResponse(
+ request.correlationId(),
+ request.destination(),
+ response
+ );
+ assertThrows(ClusterAuthorizationException.class,
context.client::poll);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean
withKip853Rpc) throws Exception {
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
+ int epoch = 5;
+ Set<Integer> voters = Set.of(localId, otherNodeId);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .updateRandom(r -> r.mockNextInt(DEFAULT_ELECTION_TIMEOUT_MS,
0))
+ .withUnknownLeader(epoch - 1)
+ .withKip853Rpc(withKip853Rpc)
+ .build();
+
+ context.time.sleep(context.electionTimeoutMs());
+ context.expectAndGrantPreVotes(epoch - 1);
+ context.expectAndGrantVotes(epoch);
+
+ context.pollUntilRequest();
+ List<RaftRequest.Outbound> requests =
context.collectBeginEpochRequests(epoch);
+ assertEquals(1, requests.size());
+ RaftRequest.Outbound request = requests.get(0);
+ assertEquals(otherNodeId, request.destination().id());
+ BeginQuorumEpochResponseData response = new
BeginQuorumEpochResponseData()
+ .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+
+ context.deliverResponse(request.correlationId(),
request.destination(), response);
+ assertThrows(ClusterAuthorizationException.class,
context.client::poll);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ void testClusterAuthorizationFailedInVote(boolean withKip853Rpc) throws
Exception {
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
+ int epoch = 5;
+ Set<Integer> voters = Set.of(localId, otherNodeId);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withUnknownLeader(epoch - 1)
+ .withKip853Rpc(withKip853Rpc)
+ .build();
+
+ // Become a candidate
+ context.unattachedToCandidate();
+ context.pollUntilRequest();
+ context.assertVotedCandidate(epoch, localId);
+
+ RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0,
0L, 1);
+ VoteResponseData response = new VoteResponseData()
+ .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+
+ context.deliverResponse(request.correlationId(),
request.destination(), response);
+ assertThrows(ClusterAuthorizationException.class,
context.client::poll);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ void testClusterAuthorizationFailedInEndQuorumEpoch(boolean withKip853Rpc)
throws Exception {
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
+ Set<Integer> voters = Set.of(localId, otherNodeId);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withUnknownLeader(1)
+ .withKip853Rpc(withKip853Rpc)
+ .build();
+
+ context.unattachedToLeader();
+ int epoch = context.currentEpoch();
+
+ context.client.shutdown(5000);
+ context.pollUntilRequest();
+
+ RaftRequest.Outbound request =
context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
+ EndQuorumEpochResponseData response = new EndQuorumEpochResponseData()
+ .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+
+ context.deliverResponse(request.correlationId(),
request.destination(), response);
+ assertThrows(ClusterAuthorizationException.class,
context.client::poll);
+ }
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index afa5cce0521..a1e92f6afcf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
@@ -63,7 +61,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS;
@@ -77,8 +74,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
-public class KafkaRaftClientTest {
+@SuppressWarnings("ClassFanOutComplexity")
+class KafkaRaftClientTest {
@Test
public void testNodeDirectoryId() {
int localId = randomReplicaId();
@@ -917,10 +914,10 @@ public class KafkaRaftClientTest {
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
- Record record = batch.iterator().next();
- assertEquals(electionTimestamp, record.timestamp());
+ Record expectedRecord = batch.iterator().next();
+ assertEquals(electionTimestamp, expectedRecord.timestamp());
RaftClientTestContext.verifyLeaderChangeMessage(localId,
List.of(localId, otherNodeId),
- List.of(otherNodeId, localId), record.key(), record.value());
+ List.of(otherNodeId, localId), expectedRecord.key(),
expectedRecord.value());
}
@ParameterizedTest
@@ -966,10 +963,10 @@ public class KafkaRaftClientTest {
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
- Record record = batch.iterator().next();
- assertEquals(electionTimestamp, record.timestamp());
+ Record expectedRecord = batch.iterator().next();
+ assertEquals(electionTimestamp, expectedRecord.timestamp());
RaftClientTestContext.verifyLeaderChangeMessage(localId,
List.of(localId, firstNodeId, secondNodeId),
- List.of(voterId, localId), record.key(), record.value());
+ List.of(voterId, localId), expectedRecord.key(),
expectedRecord.value());
}
@ParameterizedTest
@@ -2098,7 +2095,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
@@ -2145,7 +2142,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
@@ -2188,7 +2185,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
@@ -2262,7 +2259,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
@@ -2471,7 +2468,7 @@ public class KafkaRaftClientTest {
// invalid voter id is rejected
context.deliverRequest(
context.voteRequest(
- context.clusterId.toString(),
+ context.clusterId,
epoch + 1,
otherNodeKey,
ReplicaKey.of(10, Uuid.randomUuid()),
@@ -2486,7 +2483,7 @@ public class KafkaRaftClientTest {
// invalid voter directory id is rejected
context.deliverRequest(
context.voteRequest(
- context.clusterId.toString(),
+ context.clusterId,
epoch + 2,
otherNodeKey,
ReplicaKey.of(0, Uuid.randomUuid()),
@@ -2516,7 +2513,7 @@ public class KafkaRaftClientTest {
// Leader voter3 sends a begin quorum epoch request with incorrect
voter id
context.deliverRequest(
context.beginEpochRequest(
- context.clusterId.toString(),
+ context.clusterId,
epoch,
voter3,
ReplicaKey.of(10, Uuid.randomUuid())
@@ -2529,7 +2526,7 @@ public class KafkaRaftClientTest {
// Leader voter3 sends a begin quorum epoch request with incorrect
voter directory id
context.deliverRequest(
context.beginEpochRequest(
- context.clusterId.toString(),
+ context.clusterId,
epoch,
voter3,
ReplicaKey.of(localId, Uuid.randomUuid())
@@ -2542,7 +2539,7 @@ public class KafkaRaftClientTest {
// Leader voter3 sends a begin quorum epoch request with incorrect
voter directory id
context.deliverRequest(
context.beginEpochRequest(
- context.clusterId.toString(),
+ context.clusterId,
epoch,
voter3,
context.localReplicaKey()
@@ -2568,7 +2565,7 @@ public class KafkaRaftClientTest {
int epoch = context.currentEpoch();
// valid cluster id is accepted
-
context.deliverRequest(context.beginEpochRequest(context.clusterId.toString(),
epoch, localId));
+ context.deliverRequest(context.beginEpochRequest(context.clusterId,
epoch, localId));
context.pollUntilResponse();
context.assertSentBeginQuorumEpochResponse(Errors.NONE, epoch,
OptionalInt.of(localId));
@@ -2931,7 +2928,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
@@ -2970,7 +2967,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
@@ -3005,7 +3002,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
@@ -3054,7 +3051,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
@@ -3965,10 +3962,10 @@ public class KafkaRaftClientTest {
List<Record> readRecords = Utils.toList(leaderChangeBatch.iterator());
assertEquals(1, readRecords.size());
- Record record = readRecords.get(0);
- assertEquals(now, record.timestamp());
+ Record expectedRecord = readRecords.get(0);
+ assertEquals(now, expectedRecord.timestamp());
RaftClientTestContext.verifyLeaderChangeMessage(localId,
List.of(localId),
- List.of(localId), record.key(), record.value());
+ List.of(localId), expectedRecord.key(),
expectedRecord.value());
MutableRecordBatch batch = batches.get(1);
assertEquals(1, batch.partitionLeaderEpoch());
@@ -4027,24 +4024,17 @@ public class KafkaRaftClientTest {
.build();
context.pollUntil(() -> context.log.endOffset().offset() == 1L);
- assertNotNull(getMetric(context.metrics, "current-state"));
- assertNotNull(getMetric(context.metrics, "current-leader"));
- assertNotNull(getMetric(context.metrics, "current-vote"));
- assertNotNull(getMetric(context.metrics, "current-epoch"));
- assertNotNull(getMetric(context.metrics, "high-watermark"));
- assertNotNull(getMetric(context.metrics, "log-end-offset"));
- assertNotNull(getMetric(context.metrics, "log-end-epoch"));
- assertNotNull(getMetric(context.metrics,
"number-unknown-voter-connections"));
- assertNotNull(getMetric(context.metrics, "poll-idle-ratio-avg"));
- assertNotNull(getMetric(context.metrics, "commit-latency-avg"));
- assertNotNull(getMetric(context.metrics, "commit-latency-max"));
- assertNotNull(getMetric(context.metrics, "election-latency-avg"));
- assertNotNull(getMetric(context.metrics, "election-latency-max"));
- assertNotNull(getMetric(context.metrics, "fetch-records-rate"));
- assertNotNull(getMetric(context.metrics, "append-records-rate"));
- assertNotNull(getMetric(context.metrics, "number-of-voters"));
- assertNotNull(getMetric(context.metrics, "number-of-observers"));
- assertNotNull(getMetric(context.metrics, "uncommitted-voter-change"));
+ var metricNames = Set.of(
+ "current-state", "current-leader", "current-vote",
"current-epoch", "high-watermark",
+ "log-end-offset", "log-end-epoch",
"number-unknown-voter-connections", "poll-idle-ratio-avg",
+ "commit-latency-avg", "commit-latency-max",
"election-latency-avg", "election-latency-max",
+ "fetch-records-rate", "append-records-rate",
"number-of-voters", "number-of-observers",
+ "uncommitted-voter-change"
+ );
+
+ for (String metricName : metricNames) {
+ assertNotNull(getMetric(context.metrics, metricName));
+ }
assertEquals("leader", getMetric(context.metrics,
"current-state").metricValue());
assertEquals((double) localId, getMetric(context.metrics,
"current-leader").metricValue());
@@ -4068,116 +4058,6 @@ public class KafkaRaftClientTest {
assertEquals(1, context.metrics.metrics().size());
}
- @ParameterizedTest
- @ValueSource(booleans = { true, false })
- public void testClusterAuthorizationFailedInFetch(boolean withKip853Rpc)
throws Exception {
- int localId = randomReplicaId();
- int otherNodeId = localId + 1;
- int epoch = 5;
- Set<Integer> voters = Set.of(localId, otherNodeId);
-
- RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
- .withKip853Rpc(withKip853Rpc)
- .withElectedLeader(epoch, otherNodeId)
- .build();
-
- context.assertElectedLeader(epoch, otherNodeId);
-
- context.pollUntilRequest();
-
- RaftRequest.Outbound request = context.assertSentFetchRequest(epoch,
0, 0);
- FetchResponseData response = new FetchResponseData()
- .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
- context.deliverResponse(
- request.correlationId(),
- request.destination(),
- response
- );
- assertThrows(ClusterAuthorizationException.class,
context.client::poll);
- }
-
- @ParameterizedTest
- @ValueSource(booleans = { true, false })
- public void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = randomReplicaId();
- int otherNodeId = localId + 1;
- int epoch = 5;
- Set<Integer> voters = Set.of(localId, otherNodeId);
-
- RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
- .updateRandom(r -> r.mockNextInt(DEFAULT_ELECTION_TIMEOUT_MS, 0))
- .withUnknownLeader(epoch - 1)
- .withKip853Rpc(withKip853Rpc)
- .build();
-
- context.time.sleep(context.electionTimeoutMs());
- context.expectAndGrantPreVotes(epoch - 1);
- context.expectAndGrantVotes(epoch);
-
- context.pollUntilRequest();
- List<RaftRequest.Outbound> requests =
context.collectBeginEpochRequests(epoch);
- assertEquals(1, requests.size());
- RaftRequest.Outbound request = requests.get(0);
- assertEquals(otherNodeId, request.destination().id());
- BeginQuorumEpochResponseData response = new
BeginQuorumEpochResponseData()
- .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
-
- context.deliverResponse(request.correlationId(),
request.destination(), response);
- assertThrows(ClusterAuthorizationException.class,
context.client::poll);
- }
-
- @ParameterizedTest
- @ValueSource(booleans = { true, false })
- public void testClusterAuthorizationFailedInVote(boolean withKip853Rpc)
throws Exception {
- int localId = randomReplicaId();
- int otherNodeId = localId + 1;
- int epoch = 5;
- Set<Integer> voters = Set.of(localId, otherNodeId);
-
- RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
- .withUnknownLeader(epoch - 1)
- .withKip853Rpc(withKip853Rpc)
- .build();
-
- // Become a candidate
- context.unattachedToCandidate();
- context.pollUntilRequest();
- context.assertVotedCandidate(epoch, localId);
-
- RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0,
0L, 1);
- VoteResponseData response = new VoteResponseData()
- .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
-
- context.deliverResponse(request.correlationId(),
request.destination(), response);
- assertThrows(ClusterAuthorizationException.class,
context.client::poll);
- }
-
- @ParameterizedTest
- @ValueSource(booleans = { true, false })
- public void testClusterAuthorizationFailedInEndQuorumEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = randomReplicaId();
- int otherNodeId = localId + 1;
- Set<Integer> voters = Set.of(localId, otherNodeId);
-
- RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
- .withUnknownLeader(1)
- .withKip853Rpc(withKip853Rpc)
- .build();
-
- context.unattachedToLeader();
- int epoch = context.currentEpoch();
-
- context.client.shutdown(5000);
- context.pollUntilRequest();
-
- RaftRequest.Outbound request =
context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
- EndQuorumEpochResponseData response = new EndQuorumEpochResponseData()
- .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
-
- context.deliverResponse(request.correlationId(),
request.destination(), response);
- assertThrows(ClusterAuthorizationException.class,
context.client::poll);
- }
-
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void
testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffsetOnEmptyLog(
@@ -4646,7 +4526,7 @@ public class KafkaRaftClientTest {
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
- .collect(Collectors.toList());
+ .toList();
RaftClientTestContext context = new
RaftClientTestContext.Builder(OptionalInt.empty(), voters)
.withBootstrapServers(Optional.of(bootstrapServers))