This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92d8cb562a7 KAFKA-19078 Automatic controller addition to cluster
metadata partition (#19589)
92d8cb562a7 is described below
commit 92d8cb562a71ddc5b6fdc0943bec3a4900a1642e
Author: Kevin Wu <[email protected]>
AuthorDate: Wed Aug 13 10:20:18 2025 -0500
KAFKA-19078 Automatic controller addition to cluster metadata partition
(#19589)
Add the `controller.quorum.auto.join.enable` configuration. When enabled
with KIP-853 supported, follower controllers who are observers (their
replica id + directory id are not in the voter set) will:
- Automatically remove voter set entries which match their replica id
but not directory id by sending the `RemoveVoterRPC` to the leader.
- Automatically add themselves as a voter when their replica id is not
present in the voter set by sending the `AddVoterRPC` to the leader.
Reviewers: José Armando García Sancio
[[email protected]](mailto:[email protected]), Chia-Ping Tsai
[[email protected]](mailto:[email protected])
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 3 +
.../ReconfigurableQuorumIntegrationTest.java | 68 +++++
.../scala/unit/kafka/server/KafkaConfigTest.scala | 12 +
.../java/org/apache/kafka/raft/FollowerState.java | 24 +-
.../org/apache/kafka/raft/KafkaNetworkChannel.java | 23 +-
.../org/apache/kafka/raft/KafkaRaftClient.java | 151 +++++++++-
.../java/org/apache/kafka/raft/QuorumConfig.java | 15 +-
.../main/java/org/apache/kafka/raft/RaftUtil.java | 8 +-
.../apache/kafka/raft/KafkaNetworkChannelTest.java | 33 ++-
.../kafka/raft/KafkaRaftClientAutoJoinTest.java | 323 +++++++++++++++++++++
.../kafka/raft/KafkaRaftClientReconfigTest.java | 155 +---------
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 6 +-
.../org/apache/kafka/raft/QuorumConfigTest.java | 2 +
.../apache/kafka/raft/RaftClientTestContext.java | 114 +++++++-
.../java/org/apache/kafka/raft/RaftUtilTest.java | 6 +
.../kafka/common/test/KafkaClusterTestKit.java | 81 +++++-
16 files changed, 819 insertions(+), 205 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 67c6febe1ca..2bdadb02fb8 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -570,6 +570,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} not found in
${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} (an explicit
security mapping for each controller listener is required if
${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} is non-empty, or
if there are security protocols other than PLAINTEXT in use)")
}
}
+ // controller.quorum.auto.join.enable must be false for KRaft broker-only
+ require(!quorumConfig.autoJoin,
+ s"${QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG} is only supported
when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role.")
// warn that only the first controller listener is used if there is more
than one
if (controllerListenerNames.size > 1) {
warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple
entries; only the first will be used since
${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames}")
diff --git
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
index 5e21c6099e7..ad4193a0cb9 100644
--- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.common.test.api.TestKitDefaults;
+import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -164,4 +167,69 @@ public class ReconfigurableQuorumIntegrationTest {
}
}
}
+
+ @Test
+ public void testControllersAutoJoinStandaloneVoter() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
+ build();
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
+ setStandalone(true).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+ }
+ }
+ }
+
+ @Test
+ public void testNewVoterAutoRemovesAndAdds() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
+ build();
+
+ // Configure the initial voters with one voter having a different
directory ID.
+ // This simulates the case where the controller failed and is brought
back up with a different directory ID.
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ final var oldDirectoryId = Uuid.randomUuid();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.id() == TestKitDefaults.CONTROLLER_ID_OFFSET ?
+ oldDirectoryId : controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+ }
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 43384a64789..d4bf2cacc8d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1489,6 +1489,18 @@ class KafkaConfigTest {
assertEquals(expected, addresses)
}
+ @Test
+ def testInvalidQuorumAutoJoinForKRaftBroker(): Unit = {
+ val props = TestUtils.createBrokerConfig(0)
+ props.setProperty(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
String.valueOf(true))
+ assertEquals(
+ "requirement failed: controller.quorum.auto.join.enable is only " +
+ "supported when process.roles contains the 'controller' role.",
+ assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props)).getMessage
+ )
+
+ }
+
@Test
def testAcceptsLargeId(): Unit = {
val largeBrokerId = 2000
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index 09675f38082..4cbc8778149 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -40,8 +40,8 @@ public class FollowerState implements EpochState {
private final Set<Integer> voters;
// Used for tracking the expiration of both the Fetch and FetchSnapshot
requests
private final Timer fetchTimer;
- // Used to track when to send another update voter request
- private final Timer updateVoterPeriodTimer;
+ // Used to track when to send another add, remove, or update voter request
+ private final Timer updateVoterSetPeriodTimer;
/* Used to track if the replica has fetched successfully from the leader
at least once since
* the transition to follower in this epoch. If the replica has not yet
fetched successfully,
@@ -76,7 +76,7 @@ public class FollowerState implements EpochState {
this.votedKey = votedKey;
this.voters = voters;
this.fetchTimer = time.timer(fetchTimeoutMs);
- this.updateVoterPeriodTimer = time.timer(updateVoterPeriodMs());
+ this.updateVoterSetPeriodTimer = time.timer(updateVoterPeriodMs());
this.highWatermark = highWatermark;
this.log = logContext.logger(FollowerState.class);
}
@@ -154,19 +154,19 @@ public class FollowerState implements EpochState {
return fetchTimeoutMs;
}
- public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) {
- updateVoterPeriodTimer.update(currentTimeMs);
- return updateVoterPeriodTimer.isExpired();
+ public boolean hasUpdateVoterSetPeriodExpired(long currentTimeMs) {
+ updateVoterSetPeriodTimer.update(currentTimeMs);
+ return updateVoterSetPeriodTimer.isExpired();
}
- public long remainingUpdateVoterPeriodMs(long currentTimeMs) {
- updateVoterPeriodTimer.update(currentTimeMs);
- return updateVoterPeriodTimer.remainingMs();
+ public long remainingUpdateVoterSetPeriodMs(long currentTimeMs) {
+ updateVoterSetPeriodTimer.update(currentTimeMs);
+ return updateVoterSetPeriodTimer.remainingMs();
}
- public void resetUpdateVoterPeriod(long currentTimeMs) {
- updateVoterPeriodTimer.update(currentTimeMs);
- updateVoterPeriodTimer.reset(updateVoterPeriodMs());
+ public void resetUpdateVoterSetPeriod(long currentTimeMs) {
+ updateVoterSetPeriodTimer.update(currentTimeMs);
+ updateVoterSetPeriodTimer.reset(updateVoterPeriodMs());
}
public boolean hasUpdatedLeader() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
index 2a88c2a7830..898a82ef3fd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java
@@ -19,11 +19,13 @@ package org.apache.kafka.raft;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.network.ListenerName;
@@ -31,11 +33,13 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AddRaftVoterRequest;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.utils.Time;
@@ -181,20 +185,25 @@ public class KafkaNetworkChannel implements
NetworkChannel {
static AbstractRequest.Builder<? extends AbstractRequest>
buildRequest(ApiMessage requestData) {
if (requestData instanceof VoteRequestData)
return new VoteRequest.Builder((VoteRequestData) requestData);
- if (requestData instanceof BeginQuorumEpochRequestData)
+ else if (requestData instanceof BeginQuorumEpochRequestData)
return new
BeginQuorumEpochRequest.Builder((BeginQuorumEpochRequestData) requestData);
- if (requestData instanceof EndQuorumEpochRequestData)
+ else if (requestData instanceof EndQuorumEpochRequestData)
return new
EndQuorumEpochRequest.Builder((EndQuorumEpochRequestData) requestData);
- if (requestData instanceof FetchRequestData)
+ else if (requestData instanceof FetchRequestData)
return new FetchRequest.SimpleBuilder((FetchRequestData)
requestData);
- if (requestData instanceof FetchSnapshotRequestData)
+ else if (requestData instanceof FetchSnapshotRequestData)
return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData)
requestData);
- if (requestData instanceof UpdateRaftVoterRequestData)
+ else if (requestData instanceof UpdateRaftVoterRequestData)
return new
UpdateRaftVoterRequest.Builder((UpdateRaftVoterRequestData) requestData);
- if (requestData instanceof ApiVersionsRequestData)
+ else if (requestData instanceof AddRaftVoterRequestData)
+ return new AddRaftVoterRequest.Builder((AddRaftVoterRequestData)
requestData);
+ else if (requestData instanceof RemoveRaftVoterRequestData)
+ return new
RemoveRaftVoterRequest.Builder((RemoveRaftVoterRequestData) requestData);
+ else if (requestData instanceof ApiVersionsRequestData)
return new ApiVersionsRequest.Builder((ApiVersionsRequestData)
requestData,
ApiKeys.API_VERSIONS.oldestVersion(),
ApiKeys.API_VERSIONS.latestVersion());
- throw new IllegalArgumentException("Unexpected type for requestData: "
+ requestData);
+ else
+ throw new IllegalArgumentException("Unexpected type for
requestData: " + requestData);
}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 921bd72ecf2..bb70c5a7df4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -180,7 +180,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
private final Logger logger;
private final Time time;
private final int fetchMaxWaitMs;
- private final boolean followersAlwaysFlush;
+ private final boolean canBecomeVoter;
private final String clusterId;
private final Endpoints localListeners;
private final SupportedVersionRange localSupportedKRaftVersion;
@@ -229,7 +229,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
* non-participating observer.
*
* @param nodeDirectoryId the node directory id, cannot be the zero uuid
- * @param followersAlwaysFlush instruct followers to always fsync when
appending to the log
+ * @param canBecomeVoter instruct followers to always fsync when appending
to the log
*/
public KafkaRaftClient(
OptionalInt nodeId,
@@ -240,7 +240,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
Time time,
ExpirationService expirationService,
LogContext logContext,
- boolean followersAlwaysFlush,
+ boolean canBecomeVoter,
String clusterId,
Collection<InetSocketAddress> bootstrapServers,
Endpoints localListeners,
@@ -258,7 +258,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
time,
expirationService,
MAX_FETCH_WAIT_MS,
- followersAlwaysFlush,
+ canBecomeVoter,
clusterId,
bootstrapServers,
localListeners,
@@ -280,7 +280,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
Time time,
ExpirationService expirationService,
int fetchMaxWaitMs,
- boolean followersAlwaysFlush,
+ boolean canBecomeVoter,
String clusterId,
Collection<InetSocketAddress> bootstrapServers,
Endpoints localListeners,
@@ -308,7 +308,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
this.localListeners = localListeners;
this.localSupportedKRaftVersion = localSupportedKRaftVersion;
this.fetchMaxWaitMs = fetchMaxWaitMs;
- this.followersAlwaysFlush = followersAlwaysFlush;
+ this.canBecomeVoter = canBecomeVoter;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
this.quorumConfig = quorumConfig;
@@ -1839,7 +1839,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
);
}
- if (quorum.isVoter() || followersAlwaysFlush) {
+ if (quorum.isVoter() || canBecomeVoter) {
// the leader only requires that voters have flushed their log
before sending a Fetch
// request. Because of reconfiguration some observers (that are
getting added to the
// voter set) need to flush the disk because the leader may assume
that they are in the
@@ -2291,6 +2291,25 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
);
}
+ private boolean handleAddVoterResponse(
+ RaftResponse.Inbound responseMetadata,
+ long currentTimeMs
+ ) {
+ final AddRaftVoterResponseData data = (AddRaftVoterResponseData)
responseMetadata.data();
+ final Errors error = Errors.forCode(data.errorCode());
+
+ /* These error codes indicate the replica was successfully added or
the leader is unable to
+ * process the request. In either case, reset the update voter set
timer to back off.
+ */
+ if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT ||
+ error == Errors.DUPLICATE_VOTER) {
+
quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs);
+ return true;
+ } else {
+ return handleUnexpectedError(error, responseMetadata);
+ }
+ }
+
private CompletableFuture<RemoveRaftVoterResponseData>
handleRemoveVoterRequest(
RaftRequest.Inbound requestMetadata,
long currentTimeMs
@@ -2334,6 +2353,25 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
);
}
+ private boolean handleRemoveVoterResponse(
+ RaftResponse.Inbound responseMetadata,
+ long currentTimeMs
+ ) {
+ final RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData)
responseMetadata.data();
+ final Errors error = Errors.forCode(data.errorCode());
+
+ /* These error codes indicate the replica was successfully removed or
the leader is unable to
+ * process the request. In either case, reset the update voter set
timer to back off.
+ */
+ if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT ||
+ error == Errors.VOTER_NOT_FOUND) {
+
quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs);
+ return true;
+ } else {
+ return handleUnexpectedError(error, responseMetadata);
+ }
+ }
+
private CompletableFuture<UpdateRaftVoterResponseData>
handleUpdateVoterRequest(
RaftRequest.Inbound requestMetadata,
long currentTimeMs
@@ -2629,6 +2667,14 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
handledSuccessfully = handleUpdateVoterResponse(response,
currentTimeMs);
break;
+ case ADD_RAFT_VOTER:
+ handledSuccessfully = handleAddVoterResponse(response,
currentTimeMs);
+ break;
+
+ case REMOVE_RAFT_VOTER:
+ handledSuccessfully = handleRemoveVoterResponse(response,
currentTimeMs);
+ break;
+
default:
throw new IllegalArgumentException("Received unexpected
response type: " + apiKey);
}
@@ -3247,7 +3293,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
logger.info("Transitioning to Prospective state due to fetch
timeout");
transitionToProspective(currentTimeMs);
backoffMs = 0;
- } else if (state.hasUpdateVoterPeriodExpired(currentTimeMs)) {
+ } else if (state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
final boolean resetUpdateVoterTimer;
if (shouldSendUpdateVoteRequest(state)) {
var sendResult = maybeSendUpdateVoterRequest(state,
currentTimeMs);
@@ -3261,7 +3307,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
if (resetUpdateVoterTimer) {
- state.resetUpdateVoterPeriod(currentTimeMs);
+ state.resetUpdateVoterSetPeriod(currentTimeMs);
}
} else {
backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
@@ -3271,13 +3317,56 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
backoffMs,
Math.min(
state.remainingFetchTimeMs(currentTimeMs),
- state.remainingUpdateVoterPeriodMs(currentTimeMs)
+ state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
)
);
}
+ private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state,
long currentTimeMs) {
+ /* When the cluster supports reconfiguration, only replicas that can
become a voter
+ * and are configured to auto join should attempt to automatically
join the voter
+ * set for the configured topic partition.
+ */
+ return partitionState.lastKraftVersion().isReconfigSupported() &&
canBecomeVoter &&
+ quorumConfig.autoJoin() &&
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+ }
+
private long pollFollowerAsObserver(FollowerState state, long
currentTimeMs) {
- return maybeSendFetchToBestNode(state, currentTimeMs);
+ GracefulShutdown shutdown = this.shutdown.get();
+ final long backoffMs;
+ if (shutdown != null) {
+ // If we are an observer, then we can shutdown immediately. We
want to
+ // skip potentially sending any add or remove voter RPCs.
+ backoffMs = 0;
+ } else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
+ final var localReplicaKey = quorum.localReplicaKeyOrThrow();
+ final var voters = partitionState.lastVoterSet();
+ final RequestSendResult sendResult;
+ if (voters.voterIds().contains(localReplicaKey.id())) {
+ /* The replica's id is in the voter set but the replica is not
a voter because
+ * the directory id of the voter set entry is different.
Remove the old voter.
+ * Local replica is not in the voter set because the replica
is an observer.
+ */
+ final var oldVoter = voters.voterKeys()
+ .stream()
+ .filter(replicaKey -> replicaKey.id() ==
localReplicaKey.id())
+ .findFirst()
+ .get();
+ sendResult = maybeSendRemoveVoterRequest(state, oldVoter,
currentTimeMs);
+ } else {
+ sendResult = maybeSendAddVoterRequest(state, currentTimeMs);
+ }
+ backoffMs = sendResult.timeToWaitMs();
+ if (sendResult.requestSent()) {
+ state.resetUpdateVoterSetPeriod(currentTimeMs);
+ }
+ } else {
+ backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
+ }
+ return Math.min(
+ backoffMs,
+ state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
+ );
}
private long maybeSendFetchToBestNode(FollowerState state, long
currentTimeMs) {
@@ -3329,6 +3418,23 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
);
}
+ private AddRaftVoterRequestData buildAddVoterRequest() {
+ return RaftUtil.addVoterRequest(
+ clusterId,
+ quorumConfig.requestTimeoutMs(),
+ quorum.localReplicaKeyOrThrow(),
+ localListeners,
+ false
+ );
+ }
+
+ private RemoveRaftVoterRequestData buildRemoveVoterRequest(ReplicaKey
replicaKey) {
+ return RaftUtil.removeVoterRequest(
+ clusterId,
+ replicaKey
+ );
+ }
+
private RequestSendResult maybeSendUpdateVoterRequest(FollowerState state,
long currentTimeMs) {
return maybeSendRequest(
currentTimeMs,
@@ -3337,6 +3443,29 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
);
}
+ private RequestSendResult maybeSendAddVoterRequest(
+ FollowerState state,
+ long currentTimeMs
+ ) {
+ return maybeSendRequest(
+ currentTimeMs,
+ state.leaderNode(channel.listenerName()),
+ this::buildAddVoterRequest
+ );
+ }
+
+ private RequestSendResult maybeSendRemoveVoterRequest(
+ FollowerState state,
+ ReplicaKey replicaKey,
+ long currentTimeMs
+ ) {
+ return maybeSendRequest(
+ currentTimeMs,
+ state.leaderNode(channel.listenerName()),
+ () -> buildRemoveVoterRequest(replicaKey)
+ );
+ }
+
private long pollUnattached(long currentTimeMs) {
UnattachedState state = quorum.unattachedStateOrThrow();
if (quorum.isVoter()) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 3ff2f7c86de..3712c1cc92d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -35,6 +35,7 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
@@ -102,6 +103,11 @@ public class QuorumConfig {
public static final String QUORUM_RETRY_BACKOFF_MS_DOC =
CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20;
+ public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX
+ "auto.join.enable";
+ public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether
a KRaft controller should automatically " +
+ "join the cluster metadata partition for its cluster id.";
+ public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false;
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new
ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC)
.define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST,
DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new
ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC)
@@ -110,7 +116,8 @@ public class QuorumConfig {
.define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT,
DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, atLeast(0), HIGH,
QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
.define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS,
atLeast(0), MEDIUM, QUORUM_LINGER_MS_DOC)
.define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, atLeast(0), MEDIUM,
QUORUM_REQUEST_TIMEOUT_MS_DOC)
- .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT,
DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
+ .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT,
DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC)
+ .define(QUORUM_AUTO_JOIN_ENABLE_CONFIG, BOOLEAN,
DEFAULT_QUORUM_AUTO_JOIN_ENABLE, LOW, QUORUM_AUTO_JOIN_ENABLE_DOC);
private final List<String> voters;
private final List<String> bootstrapServers;
@@ -120,6 +127,7 @@ public class QuorumConfig {
private final int electionBackoffMaxMs;
private final int fetchTimeoutMs;
private final int appendLingerMs;
+ private final boolean autoJoin;
public QuorumConfig(AbstractConfig abstractConfig) {
this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG);
@@ -130,6 +138,7 @@ public class QuorumConfig {
this.electionBackoffMaxMs =
abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
this.fetchTimeoutMs =
abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG);
+ this.autoJoin =
abstractConfig.getBoolean(QUORUM_AUTO_JOIN_ENABLE_CONFIG);
}
public List<String> voters() {
@@ -164,6 +173,10 @@ public class QuorumConfig {
return appendLingerMs;
}
+ public boolean autoJoin() {
+ return autoJoin;
+ }
+
private static Integer parseVoterId(String idString) {
try {
return Integer.parseInt(idString);
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index caa087378c5..f3f411885a7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -64,6 +64,8 @@ public class RaftUtil {
case FETCH_SNAPSHOT -> new
FetchSnapshotResponseData().setErrorCode(error.code());
case API_VERSIONS -> new
ApiVersionsResponseData().setErrorCode(error.code());
case UPDATE_RAFT_VOTER -> new
UpdateRaftVoterResponseData().setErrorCode(error.code());
+ case ADD_RAFT_VOTER -> new
AddRaftVoterResponseData().setErrorCode(error.code());
+ case REMOVE_RAFT_VOTER -> new
RemoveRaftVoterResponseData().setErrorCode(error.code());
default -> throw new IllegalArgumentException("Received response
for unexpected request type: " + apiKey);
};
}
@@ -524,14 +526,16 @@ public class RaftUtil {
String clusterId,
int timeoutMs,
ReplicaKey voter,
- Endpoints listeners
+ Endpoints listeners,
+ boolean ackWhenCommitted
) {
return new AddRaftVoterRequestData()
.setClusterId(clusterId)
.setTimeoutMs(timeoutMs)
.setVoterId(voter.id())
.setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID))
- .setListeners(listeners.toAddVoterRequest());
+ .setListeners(listeners.toAddVoterRequest())
+ .setAckWhenCommitted(ackWhenCommitted);
}
public static AddRaftVoterResponseData addVoterResponse(
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index e56b9c94b49..8b2e70d8a2a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -22,12 +22,14 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.network.ListenerName;
@@ -36,6 +38,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddRaftVoterResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
@@ -44,6 +47,7 @@ import
org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
+import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
@@ -88,7 +92,9 @@ public class KafkaNetworkChannelTest {
ApiKeys.END_QUORUM_EPOCH,
ApiKeys.FETCH,
ApiKeys.FETCH_SNAPSHOT,
- ApiKeys.UPDATE_RAFT_VOTER
+ ApiKeys.UPDATE_RAFT_VOTER,
+ ApiKeys.ADD_RAFT_VOTER,
+ ApiKeys.REMOVE_RAFT_VOTER
);
private final int requestTimeoutMs = 30000;
@@ -316,6 +322,21 @@ public class KafkaNetworkChannelTest {
Endpoints.empty()
);
+ case ADD_RAFT_VOTER:
+ return RaftUtil.addVoterRequest(
+ clusterId,
+ requestTimeoutMs,
+ ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID),
+ Endpoints.empty(),
+ true
+ );
+
+ case REMOVE_RAFT_VOTER:
+ return RaftUtil.removeVoterRequest(
+ clusterId,
+ ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID)
+ );
+
default:
throw new AssertionError("Unexpected api " + key);
}
@@ -345,6 +366,8 @@ public class KafkaNetworkChannelTest {
case FETCH -> new FetchResponseData().setErrorCode(error.code());
case FETCH_SNAPSHOT -> new
FetchSnapshotResponseData().setErrorCode(error.code());
case UPDATE_RAFT_VOTER -> new
UpdateRaftVoterResponseData().setErrorCode(error.code());
+ case ADD_RAFT_VOTER -> new
AddRaftVoterResponseData().setErrorCode(error.code());
+ case REMOVE_RAFT_VOTER -> new
RemoveRaftVoterResponseData().setErrorCode(error.code());
default -> throw new AssertionError("Unexpected api " + key);
};
}
@@ -363,6 +386,10 @@ public class KafkaNetworkChannelTest {
code = ((FetchSnapshotResponseData) response).errorCode();
} else if (response instanceof UpdateRaftVoterResponseData) {
code = ((UpdateRaftVoterResponseData) response).errorCode();
+ } else if (response instanceof AddRaftVoterResponseData) {
+ code = ((AddRaftVoterResponseData) response).errorCode();
+ } else if (response instanceof RemoveRaftVoterResponseData) {
+ code = ((RemoveRaftVoterResponseData) response).errorCode();
} else {
throw new IllegalArgumentException("Unexpected type for
responseData: " + response);
}
@@ -383,6 +410,10 @@ public class KafkaNetworkChannelTest {
return new FetchSnapshotResponse((FetchSnapshotResponseData)
responseData);
} else if (responseData instanceof UpdateRaftVoterResponseData) {
return new UpdateRaftVoterResponse((UpdateRaftVoterResponseData)
responseData);
+ } else if (responseData instanceof AddRaftVoterResponseData) {
+ return new AddRaftVoterResponse((AddRaftVoterResponseData)
responseData);
+ } else if (responseData instanceof RemoveRaftVoterResponseData) {
+ return new RemoveRaftVoterResponse((RemoveRaftVoterResponseData)
responseData);
} else {
throw new IllegalArgumentException("Unexpected type for
responseData: " + responseData);
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java
new file mode 100644
index 00000000000..7ad195e7d21
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.server.common.KRaftVersion;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static
org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_595_PROTOCOL;
+import static
org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_853_PROTOCOL;
+
+public class KafkaRaftClientAutoJoinTest {
+ @Test
+ public void testAutoRemoveOldVoter() throws Exception {
+ final var leader = replicaKey(randomReplicaId(), true);
+ final var oldFollower = replicaKey(leader.id() + 1, true);
+ final var newFollowerKey = replicaKey(oldFollower.id(), true);
+ final int epoch = 1;
+ final var context = new RaftClientTestContext.Builder(
+ newFollowerKey.id(),
+ newFollowerKey.directoryId().get()
+ )
+ .withRaftProtocol(KIP_853_PROTOCOL)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(leader, oldFollower)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, leader.id())
+ .withAutoJoin(true)
+ .withCanBecomeVoter(true)
+ .build();
+
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ // the next request should be a remove voter request
+ pollAndDeliverRemoveVoter(context, oldFollower);
+
+ // after sending a remove voter the next request should be a fetch
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ // the replica should send remove voter again because the fetch did
not update the voter set
+ pollAndDeliverRemoveVoter(context, oldFollower);
+ }
+
+ @Test
+ public void testAutoAddNewVoter() throws Exception {
+ final var leader = replicaKey(randomReplicaId(), true);
+ final var follower = replicaKey(leader.id() + 1, true);
+ final var newVoter = replicaKey(follower.id() + 1, true);
+ final int epoch = 1;
+ final var context = new RaftClientTestContext.Builder(
+ newVoter.id(),
+ newVoter.directoryId().get()
+ )
+ .withRaftProtocol(KIP_853_PROTOCOL)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(leader, follower)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, leader.id())
+ .withAutoJoin(true)
+ .withCanBecomeVoter(true)
+ .build();
+
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ // the next request should be an add voter request
+ pollAndSendAddVoter(context, newVoter);
+
+ // expire the add voter request, the next request should be a fetch
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ // the replica should send add voter again because the completed fetch
+ // did not update the voter set, and its timer has expired
+ final var addVoterRequest = pollAndSendAddVoter(context, newVoter);
+
+ // deliver the add voter response, this is possible before a completed
fetch because of KIP-1186
+ context.deliverResponse(
+ addVoterRequest.correlationId(),
+ addVoterRequest.destination(),
+ RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message())
+ );
+
+ // verify the replica can perform a fetch to commit the new voter set
+ pollAndDeliverFetchToUpdateVoterSet(
+ context,
+ epoch,
+ VoterSetTest.voterSet(Stream.of(leader, newVoter))
+ );
+ }
+
+ @Test
+ public void testObserverRemovesOldVoterAndAutoJoins() throws Exception {
+ final var leader = replicaKey(randomReplicaId(), true);
+ final var oldFollower = replicaKey(leader.id() + 1, true);
+ final var newFollowerKey = replicaKey(oldFollower.id(), true);
+ final int epoch = 1;
+ final var context = new RaftClientTestContext.Builder(
+ newFollowerKey.id(),
+ newFollowerKey.directoryId().get()
+ )
+ .withRaftProtocol(KIP_853_PROTOCOL)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(leader, oldFollower)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, leader.id())
+ .withAutoJoin(true)
+ .withCanBecomeVoter(true)
+ .build();
+
+ // advance time and complete a fetch to trigger the remove voter
request
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ // the next request should be a remove voter request
+ pollAndDeliverRemoveVoter(context, oldFollower);
+
+ // after sending a remove voter the next request should be a fetch
+ // this fetch will remove the old follower from the voter set
+ pollAndDeliverFetchToUpdateVoterSet(
+ context,
+ epoch,
+ VoterSetTest.voterSet(Stream.of(leader))
+ );
+
+ // advance time and complete a fetch to trigger the add voter request
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ // the next request should be an add voter request
+ final var addVoterRequest = pollAndSendAddVoter(context,
newFollowerKey);
+
+ // deliver the add voter response, this is possible before a completed
fetch because of KIP-1186
+ context.deliverResponse(
+ addVoterRequest.correlationId(),
+ addVoterRequest.destination(),
+ RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message())
+ );
+
+ // verify the replica can perform a fetch to commit the new voter set
+ pollAndDeliverFetchToUpdateVoterSet(
+ context,
+ epoch,
+ VoterSetTest.voterSet(Stream.of(leader, newFollowerKey))
+ );
+
+ // advance time and complete a fetch and expire the update voter set
timer
+ // the next request should be a fetch because the log voter
configuration is up-to-date
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+ context.pollUntilRequest();
+ context.assertSentFetchRequest();
+ }
+
+
+ @Test
+ public void testObserversDoNotAutoJoin() throws Exception {
+ final var leader = replicaKey(randomReplicaId(), true);
+ final var follower = replicaKey(leader.id() + 1, true);
+ final var newObserver = replicaKey(follower.id() + 1, true);
+ final int epoch = 1;
+ final var context = new RaftClientTestContext.Builder(
+ newObserver.id(),
+ newObserver.directoryId().get()
+ )
+ .withRaftProtocol(KIP_853_PROTOCOL)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(leader, follower)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, leader.id())
+ .withAutoJoin(true)
+ .withCanBecomeVoter(false)
+ .build();
+
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ context.time.sleep(context.fetchTimeoutMs - 1);
+ context.pollUntilRequest();
+
+ // When canBecomeVoter == false, the replica should not send an add
voter request
+ final var fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
+ }
+
+ @Test
+ public void testObserverDoesNotAddItselfWhenAutoJoinDisabled() throws
Exception {
+ final var leader = replicaKey(randomReplicaId(), true);
+ final var follower = replicaKey(leader.id() + 1, true);
+ final var observer = replicaKey(follower.id() + 1, true);
+ final int epoch = 1;
+ final var context = new RaftClientTestContext.Builder(
+ observer.id(),
+ observer.directoryId().get()
+ )
+ .withRaftProtocol(KIP_853_PROTOCOL)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(leader, follower)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, leader.id())
+ .withAutoJoin(false)
+ .withCanBecomeVoter(true)
+ .build();
+
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ context.time.sleep(context.fetchTimeoutMs - 1);
+ context.pollUntilRequest();
+
+ // When autoJoin == false, the replica should not send an add voter
request
+ final var fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
+ }
+
+ @Test
+ public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws
Exception {
+ final var leader = replicaKey(randomReplicaId(), true);
+ final var follower = replicaKey(leader.id() + 1, true);
+ final var observer = replicaKey(follower.id() + 1, true);
+ final int epoch = 1;
+ final var context = new RaftClientTestContext.Builder(
+ observer.id(),
+ observer.directoryId().get()
+ )
+ .withRaftProtocol(KIP_595_PROTOCOL)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(leader, follower)),
KRaftVersion.KRAFT_VERSION_0
+ )
+ .withElectedLeader(epoch, leader.id())
+ .withAutoJoin(true)
+ .withCanBecomeVoter(true)
+ .build();
+
+ context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
+
+ context.time.sleep(context.fetchTimeoutMs - 1);
+ context.pollUntilRequest();
+
+ // When kraft.version == 0, the replica should not send an add voter
request
+ final var fetchRequest = context.assertSentFetchRequest();
+
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
+ }
+
+ private void pollAndDeliverRemoveVoter(
+ RaftClientTestContext context,
+ ReplicaKey oldFollower
+ ) throws Exception {
+ context.pollUntilRequest();
+ final var removeRequest =
context.assertSentRemoveVoterRequest(oldFollower);
+ context.deliverResponse(
+ removeRequest.correlationId(),
+ removeRequest.destination(),
+ RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message())
+ );
+ }
+
+ private RaftRequest.Outbound pollAndSendAddVoter(
+ RaftClientTestContext context,
+ ReplicaKey newVoter
+ ) throws Exception {
+ context.pollUntilRequest();
+ return context.assertSentAddVoterRequest(
+ newVoter,
+ context.client.quorum().localVoterNodeOrThrow().listeners()
+ );
+ }
+
+ private void pollAndDeliverFetchToUpdateVoterSet(
+ RaftClientTestContext context,
+ int epoch,
+ VoterSet newVoterSet
+ ) throws Exception {
+ context.pollUntilRequest();
+ final var fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(
+ fetchRequest,
+ epoch,
+ context.log.endOffset().offset(),
+ context.log.lastFetchedEpoch(),
+ context.client.highWatermark()
+ );
+ // deliver the fetch response with the updated voter set
+ context.deliverResponse(
+ fetchRequest.correlationId(),
+ fetchRequest.destination(),
+ context.fetchResponse(
+ epoch,
+ fetchRequest.destination().id(),
+ MemoryRecords.withVotersRecord(
+ context.log.endOffset().offset(),
+ context.time.milliseconds(),
+ epoch,
+ BufferSupplier.NO_CACHING.get(300),
+ newVoterSet.toVotersRecord((short) 0)
+ ),
+ context.log.endOffset().offset() + 1,
+ Errors.NONE
+ )
+ );
+ // poll kraft to update the replica's voter set
+ context.client.poll();
+ }
+
+ private int randomReplicaId() {
+ return ThreadLocalRandom.current().nextInt(1025);
+ }
+}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 3bb8e93f5ea..dc70f7ce622 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -78,8 +78,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KafkaRaftClientReconfigTest {
- private static final int NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD = 1;
-
@Test
public void testLeaderWritesBootstrapRecords() throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
@@ -2225,28 +2223,8 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH requests until the UpdateRaftVoter request is sent
- for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
- context.time.sleep(context.fetchTimeoutMs - 1);
- context.pollUntilRequest();
- RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
+ context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
- context.deliverResponse(
- fetchRequest.correlationId(),
- fetchRequest.destination(),
- context.fetchResponse(
- epoch,
- voter1.id(),
- MemoryRecords.EMPTY,
- 0L,
- Errors.NONE
- )
- );
- // poll kraft to handle the fetch response
- context.client.poll();
- }
-
- context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound updateRequest =
context.assertSentUpdateVoterRequest(
local,
@@ -2298,28 +2276,8 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH request until the UpdateRaftVoter request is set
- for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
- context.time.sleep(context.fetchTimeoutMs - 1);
- context.pollUntilRequest();
- RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
-
- context.deliverResponse(
- fetchRequest.correlationId(),
- fetchRequest.destination(),
- context.fetchResponse(
- epoch,
- voter1.id(),
- MemoryRecords.EMPTY,
- 0L,
- Errors.NONE
- )
- );
- // poll kraft to handle the fetch response
- context.client.poll();
- }
+ context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
- context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound updateRequest =
context.assertSentUpdateVoterRequest(
local,
@@ -2389,28 +2347,8 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH request until the UpdateRaftVoter request is set
- for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
- context.time.sleep(context.fetchTimeoutMs - 1);
- context.pollUntilRequest();
- RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
+ context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
- context.deliverResponse(
- fetchRequest.correlationId(),
- fetchRequest.destination(),
- context.fetchResponse(
- epoch,
- voter1.id(),
- MemoryRecords.EMPTY,
- 0L,
- Errors.NONE
- )
- );
- // poll kraft to handle the fetch response
- context.client.poll();
- }
-
- context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound updateRequest =
context.assertSentUpdateVoterRequest(
local,
@@ -2437,28 +2375,8 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilResponse();
// waiting for FETCH request until the UpdateRaftVoter request is set
- for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
- context.time.sleep(context.fetchTimeoutMs - 1);
- context.pollUntilRequest();
- fetchRequest = context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0,
context.client.highWatermark());
+ context.advanceTimeAndCompleteFetch(newEpoch, voter1.id(), true);
- context.deliverResponse(
- fetchRequest.correlationId(),
- fetchRequest.destination(),
- context.fetchResponse(
- newEpoch,
- voter1.id(),
- MemoryRecords.EMPTY,
- 0L,
- Errors.NONE
- )
- );
- // poll kraft to handle the fetch response
- context.client.poll();
- }
-
- context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
updateRequest = context.assertSentUpdateVoterRequest(
local,
@@ -2723,29 +2641,9 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH request until the UpdateRaftVoter request is set
- for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
- context.time.sleep(context.fetchTimeoutMs - 1);
- context.pollUntilRequest();
- RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
-
- context.deliverResponse(
- fetchRequest.correlationId(),
- fetchRequest.destination(),
- context.fetchResponse(
- epoch,
- voter1.id(),
- MemoryRecords.EMPTY,
- 0L,
- Errors.NONE
- )
- );
- // poll kraft to handle the fetch response
- context.client.poll();
- }
+ context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
// update voter should not be sent because the local listener is not
different from the voter set
- context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
@@ -2784,26 +2682,7 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting up to the last FETCH request before the UpdateRaftVoter
request is set
- for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
- context.time.sleep(context.fetchTimeoutMs - 1);
- context.pollUntilRequest();
- RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
-
- context.deliverResponse(
- fetchRequest.correlationId(),
- fetchRequest.destination(),
- context.fetchResponse(
- epoch,
- voter1.id(),
- MemoryRecords.EMPTY,
- 0L,
- Errors.NONE
- )
- );
- // poll kraft to handle the fetch response
- context.client.poll();
- }
+ context.advanceTimeAndCompleteFetch(epoch, voter1.id(), false);
// expect one last FETCH request
context.pollUntilRequest();
@@ -2864,28 +2743,8 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH request until the UpdateRaftVoter request is set
- for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
- context.time.sleep(context.fetchTimeoutMs - 1);
- context.pollUntilRequest();
- RaftRequest.Outbound fetchRequest =
context.assertSentFetchRequest();
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
context.client.highWatermark());
-
- context.deliverResponse(
- fetchRequest.correlationId(),
- fetchRequest.destination(),
- context.fetchResponse(
- epoch,
- voter1.id(),
- MemoryRecords.EMPTY,
- 0L,
- Errors.NONE
- )
- );
- // poll kraft to handle the fetch response
- context.client.poll();
- }
+ context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
- context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound updateRequest =
context.assertSentUpdateVoterRequest(
local,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 4687fd3d903..1efd3247ebd 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -3786,7 +3786,7 @@ class KafkaRaftClientTest {
@ParameterizedTest
@CsvSource({ "true, true", "true, false", "false, true", "false, false" })
- public void testObserverReplication(boolean withKip853Rpc, boolean
alwaysFlush) throws Exception {
+ public void testObserverReplication(boolean withKip853Rpc, boolean
canBecomeVoter) throws Exception {
int localId = randomReplicaId();
int otherNodeId = localId + 1;
int epoch = 5;
@@ -3795,7 +3795,7 @@ class KafkaRaftClientTest {
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, otherNodeId)
.withKip853Rpc(withKip853Rpc)
- .withAlwaysFlush(alwaysFlush)
+ .withCanBecomeVoter(canBecomeVoter)
.build();
context.assertElectedLeader(epoch, otherNodeId);
@@ -3812,7 +3812,7 @@ class KafkaRaftClientTest {
context.client.poll();
assertEquals(2L, context.log.endOffset().offset());
- long firstUnflushedOffset = alwaysFlush ? 2L : 0L;
+ long firstUnflushedOffset = canBecomeVoter ? 2L : 0L;
assertEquals(firstUnflushedOffset, context.log.firstUnflushedOffset());
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
index 2197f3766c2..ce7175a8b5e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
@@ -34,6 +34,7 @@ public class QuorumConfigTest {
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_LINGER_MS_CONFIG,
"-1"));
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG,
"-1"));
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "-1"));
+
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, "-1"));
}
private void assertInvalidConfig(Map<String, Object> overrideConfig) {
@@ -46,6 +47,7 @@ public class QuorumConfigTest {
props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "10");
props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "10");
props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "10");
+ props.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true);
props.putAll(overrideConfig);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 970f442c004..a98fb79d09a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -143,10 +143,12 @@ public final class RaftClientTestContext {
// Used to determine which RPC request and response to construct
final RaftProtocol raftProtocol;
// Used to determine if the local kraft client was configured to always
flush
- final boolean alwaysFlush;
+ final boolean canBecomeVoter;
private final List<RaftResponse.Outbound> sentResponses = new
ArrayList<>();
+ private static final int NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD
= 1;
+
public static final class Builder {
static final int DEFAULT_ELECTION_TIMEOUT_MS = 10000;
@@ -177,10 +179,11 @@ public final class RaftClientTestContext {
private MemoryPool memoryPool = MemoryPool.NONE;
private Optional<List<InetSocketAddress>> bootstrapServers =
Optional.empty();
private RaftProtocol raftProtocol = RaftProtocol.KIP_595_PROTOCOL;
- private boolean alwaysFlush = false;
+ private boolean canBecomeVoter = false;
private VoterSet startingVoters = VoterSet.empty();
private Endpoints localListeners = Endpoints.empty();
private boolean isStartingVotersStatic = false;
+ private boolean autoJoin = false;
public Builder(int localId, Set<Integer> staticVoters) {
this(OptionalInt.of(localId), staticVoters);
@@ -309,8 +312,8 @@ public final class RaftClientTestContext {
return this;
}
- Builder withAlwaysFlush(boolean alwaysFlush) {
- this.alwaysFlush = alwaysFlush;
+ Builder withCanBecomeVoter(boolean canBecomeVoter) {
+ this.canBecomeVoter = canBecomeVoter;
return this;
}
@@ -376,6 +379,11 @@ public final class RaftClientTestContext {
return this;
}
+ Builder withAutoJoin(boolean autoJoin) {
+ this.autoJoin = autoJoin;
+ return this;
+ }
+
public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel();
@@ -404,13 +412,14 @@ public final class RaftClientTestContext {
Endpoints.empty() :
this.localListeners;
- Map<String, Integer> configMap = new HashMap<>();
+ Map<String, Object> configMap = new HashMap<>();
configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG,
requestTimeoutMs);
configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG,
RETRY_BACKOFF_MS);
configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG,
electionTimeoutMs);
configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG,
ELECTION_BACKOFF_MAX_MS);
configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG,
FETCH_TIMEOUT_MS);
configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG,
appendLingerMs);
+ configMap.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
autoJoin);
QuorumConfig quorumConfig = new QuorumConfig(new
AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
List<InetSocketAddress> computedBootstrapServers =
bootstrapServers.orElseGet(() -> {
@@ -436,7 +445,7 @@ public final class RaftClientTestContext {
time,
new MockExpirationService(time),
FETCH_MAX_WAIT_MS,
- alwaysFlush,
+ canBecomeVoter,
clusterId,
computedBootstrapServers,
localListeners,
@@ -474,7 +483,7 @@ public final class RaftClientTestContext {
.boxed()
.collect(Collectors.toSet()),
raftProtocol,
- alwaysFlush,
+ canBecomeVoter,
metrics,
externalKRaftMetrics,
listener
@@ -503,7 +512,7 @@ public final class RaftClientTestContext {
VoterSet startingVoters,
Set<Integer> bootstrapIds,
RaftProtocol raftProtocol,
- boolean alwaysFlush,
+ boolean canBecomeVoter,
Metrics metrics,
ExternalKRaftMetrics externalKRaftMetrics,
MockListener listener
@@ -521,7 +530,7 @@ public final class RaftClientTestContext {
this.startingVoters = startingVoters;
this.bootstrapIds = bootstrapIds;
this.raftProtocol = raftProtocol;
- this.alwaysFlush = alwaysFlush;
+ this.canBecomeVoter = canBecomeVoter;
this.metrics = metrics;
this.externalKRaftMetrics = externalKRaftMetrics;
this.listener = listener;
@@ -949,6 +958,51 @@ public final class RaftClientTestContext {
channel.mockReceive(new RaftResponse.Inbound(correlationId,
versionedResponse, source));
}
+ /**
+ * Advance time and complete an empty fetch to reset the fetch timer.
+ * This is used to expire the update voter set timer without also expiring
the fetch timer,
+ * which is needed for add, remove, and update voter tests.
+ * For voters and observers, polling after exiting this method expires the
update voter set timer.
+ * @param epoch - the current epoch
+ * @param leaderId - the leader id
+ * @param expireUpdateVoterSetTimer - if true, advance time again to
expire this timer
+ */
+ void advanceTimeAndCompleteFetch(
+ int epoch,
+ int leaderId,
+ boolean expireUpdateVoterSetTimer
+ ) throws Exception {
+ for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD;
i++) {
+ time.sleep(fetchTimeoutMs - 1);
+ pollUntilRequest();
+ final var fetchRequest = assertSentFetchRequest();
+ assertFetchRequestData(
+ fetchRequest,
+ epoch,
+ log.endOffset().offset(),
+ log.lastFetchedEpoch(),
+ client.highWatermark()
+ );
+
+ deliverResponse(
+ fetchRequest.correlationId(),
+ fetchRequest.destination(),
+ fetchResponse(
+ epoch,
+ leaderId,
+ MemoryRecords.EMPTY,
+ log.endOffset().offset(),
+ Errors.NONE
+ )
+ );
+ // poll kraft to handle the fetch response
+ client.poll();
+ }
+ if (expireUpdateVoterSetTimer) {
+ time.sleep(fetchTimeoutMs - 1);
+ }
+ }
+
List<RaftRequest.Outbound> assertSentBeginQuorumEpochRequest(int epoch,
Set<Integer> destinationIds) {
List<RaftRequest.Outbound> requests = collectBeginEpochRequests(epoch);
assertEquals(destinationIds.size(), requests.size());
@@ -1259,6 +1313,26 @@ public final class RaftClientTestContext {
return sentRequests.get(0);
}
+ RaftRequest.Outbound assertSentAddVoterRequest(
+ ReplicaKey replicaKey,
+ Endpoints endpoints
+ ) {
+ final var sentRequests =
channel.drainSentRequests(Optional.of(ApiKeys.ADD_RAFT_VOTER));
+ assertEquals(1, sentRequests.size());
+
+ final var request = sentRequests.get(0);
+ assertInstanceOf(AddRaftVoterRequestData.class, request.data());
+
+ final var addRaftVoterRequestData = (AddRaftVoterRequestData)
request.data();
+ assertEquals(clusterId, addRaftVoterRequestData.clusterId());
+ assertEquals(replicaKey.id(), addRaftVoterRequestData.voterId());
+ assertEquals(replicaKey.directoryId().get(),
addRaftVoterRequestData.voterDirectoryId());
+ assertEquals(endpoints,
Endpoints.fromAddVoterRequest(addRaftVoterRequestData.listeners()));
+ assertEquals(false, addRaftVoterRequestData.ackWhenCommitted());
+
+ return request;
+ }
+
AddRaftVoterResponseData assertSentAddVoterResponse(Errors error) {
List<RaftResponse.Outbound> sentResponses =
drainSentResponses(ApiKeys.ADD_RAFT_VOTER);
assertEquals(1, sentResponses.size());
@@ -1272,6 +1346,23 @@ public final class RaftClientTestContext {
return addVoterResponse;
}
+ RaftRequest.Outbound assertSentRemoveVoterRequest(
+ ReplicaKey replicaKey
+ ) {
+ final var sentRequests =
channel.drainSentRequests(Optional.of(ApiKeys.REMOVE_RAFT_VOTER));
+ assertEquals(1, sentRequests.size());
+
+ final var request = sentRequests.get(0);
+ assertInstanceOf(RemoveRaftVoterRequestData.class, request.data());
+
+ final var removeRaftVoterRequestData = (RemoveRaftVoterRequestData)
request.data();
+ assertEquals(clusterId, removeRaftVoterRequestData.clusterId());
+ assertEquals(replicaKey.id(), removeRaftVoterRequestData.voterId());
+ assertEquals(replicaKey.directoryId().get(),
removeRaftVoterRequestData.voterDirectoryId());
+
+ return request;
+ }
+
RemoveRaftVoterResponseData assertSentRemoveVoterResponse(Errors error) {
List<RaftResponse.Outbound> sentResponses =
drainSentResponses(ApiKeys.REMOVE_RAFT_VOTER);
assertEquals(1, sentResponses.size());
@@ -1707,7 +1798,7 @@ public final class RaftClientTestContext {
// Assert that voters have flushed up to the fetch offset
if ((localId.isPresent() &&
startingVoters.voterIds().contains(localId.getAsInt())) ||
- alwaysFlush
+ canBecomeVoter
) {
assertEquals(
log.firstUnflushedOffset(),
@@ -1921,7 +2012,8 @@ public final class RaftClientTestContext {
clusterId,
timeoutMs,
voter,
- endpoints
+ endpoints,
+ true
);
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
index 770a08b49ab..34b7c9f003d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AddRaftVoterRequestDataJsonConverter;
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import
org.apache.kafka.common.message.BeginQuorumEpochRequestDataJsonConverter;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
@@ -42,6 +43,7 @@ import
org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter;
+import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteRequestDataJsonConverter;
import org.apache.kafka.common.message.VoteResponseData;
@@ -93,6 +95,10 @@ public class RaftUtilTest {
RaftUtil.errorResponse(ApiKeys.FETCH, Errors.NONE));
assertEquals(new
FetchSnapshotResponseData().setErrorCode(Errors.NONE.code()),
RaftUtil.errorResponse(ApiKeys.FETCH_SNAPSHOT, Errors.NONE));
+ assertEquals(new
AddRaftVoterResponseData().setErrorCode(Errors.NONE.code()),
+ RaftUtil.errorResponse(ApiKeys.ADD_RAFT_VOTER, Errors.NONE));
+ assertEquals(new
RemoveRaftVoterResponseData().setErrorCode(Errors.NONE.code()),
+ RaftUtil.errorResponse(ApiKeys.REMOVE_RAFT_VOTER, Errors.NONE));
assertThrows(IllegalArgumentException.class, () ->
RaftUtil.errorResponse(ApiKeys.PRODUCE, Errors.NONE));
}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 45b7cada936..59041c7a66a 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -27,10 +27,12 @@ import kafka.server.SharedServer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -114,6 +116,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final String controllerListenerName;
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;
+ private boolean standalone;
+ private Optional<Map<Integer, Uuid>> initialVoterSet =
Optional.empty();
private boolean deleteOnClose;
public Builder(TestKitNodes nodes) {
@@ -130,6 +134,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
return this;
}
+ public Builder setStandalone(boolean standalone) {
+ this.standalone = standalone;
+ return this;
+ }
+
+ public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
+ this.initialVoterSet = Optional.of(initialVoterSet);
+ return this;
+ }
+
private KafkaConfig createNodeConfig(TestKitNode node) throws
IOException {
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
TestKitNode controllerNode =
nodes.controllerNodes().get(node.id());
@@ -184,6 +198,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
// reduce log cleaner offset map memory usage
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
+ // do not include auto join config in broker nodes
+ if (brokerNode != null) {
+ props.remove(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG);
+ }
+
// Add associated broker node property overrides
if (brokerNode != null) {
props.putAll(brokerNode.propertyOverrides());
@@ -323,6 +342,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
faultHandlerFactory,
socketFactoryManager,
jaasFile,
+ standalone,
+ initialVoterSet,
deleteOnClose);
}
@@ -368,6 +389,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
private final Optional<File> jaasFile;
+ private final boolean standalone;
+ private final Optional<Map<Integer, Uuid>> initialVoterSet;
private final boolean deleteOnClose;
private KafkaClusterTestKit(
@@ -378,6 +401,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
SimpleFaultHandlerFactory faultHandlerFactory,
PreboundSocketFactoryManager socketFactoryManager,
Optional<File> jaasFile,
+ boolean standalone,
+ Optional<Map<Integer, Uuid>> initialVoterSet,
boolean deleteOnClose
) {
/*
@@ -395,6 +420,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
this.jaasFile = jaasFile;
+ this.standalone = standalone;
+ this.initialVoterSet = initialVoterSet;
this.deleteOnClose = deleteOnClose;
}
@@ -425,8 +452,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
boolean writeMetadataDirectory
) {
try {
+ final var nodeId = ensemble.nodeId().getAsInt();
Formatter formatter = new Formatter();
- formatter.setNodeId(ensemble.nodeId().getAsInt());
+ formatter.setNodeId(nodeId);
formatter.setClusterId(ensemble.clusterId().get());
if (writeMetadataDirectory) {
formatter.setDirectories(ensemble.logDirProps().keySet());
@@ -452,15 +480,50 @@ public class KafkaClusterTestKit implements AutoCloseable
{
if
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
StringBuilder dynamicVotersBuilder = new StringBuilder();
String prefix = "";
- for (TestKitNode controllerNode :
nodes.controllerNodes().values()) {
- int port = socketFactoryManager.
- getOrCreatePortForListener(controllerNode.id(),
controllerListenerName);
- dynamicVotersBuilder.append(prefix);
- prefix = ",";
-
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
- controllerNode.id(), port,
controllerNode.metadataDirectoryId()));
+ if (standalone) {
+ if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
+ final var controllerNode =
nodes.controllerNodes().get(nodeId);
+ dynamicVotersBuilder.append(
+ String.format(
+ "%d@localhost:%d:%s",
+ controllerNode.id(),
+ socketFactoryManager.
+
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+ controllerNode.metadataDirectoryId()
+ )
+ );
+
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+ } else {
+ formatter.setNoInitialControllersFlag(true);
+ }
+ } else if (initialVoterSet.isPresent()) {
+ for (final var controllerNode :
initialVoterSet.get().entrySet()) {
+ final var voterId = controllerNode.getKey();
+ final var voterDirectoryId = controllerNode.getValue();
+ dynamicVotersBuilder.append(prefix);
+ prefix = ",";
+ dynamicVotersBuilder.append(
+ String.format(
+ "%d@localhost:%d:%s",
+ voterId,
+ socketFactoryManager.
+ getOrCreatePortForListener(voterId,
controllerListenerName),
+ voterDirectoryId
+ )
+ );
+ }
+
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+ } else {
+ for (TestKitNode controllerNode :
nodes.controllerNodes().values()) {
+ int port = socketFactoryManager.
+ getOrCreatePortForListener(controllerNode.id(),
controllerListenerName);
+ dynamicVotersBuilder.append(prefix);
+ prefix = ",";
+
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
+ controllerNode.id(), port,
controllerNode.metadataDirectoryId()));
+ }
+
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
}
-
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
}
formatter.run();
} catch (Exception e) {