This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8c88cdb7186 KAFKA-14617: Update AlterPartitionRequest and enable Kraft
controller to reject stale request. (#13408)
8c88cdb7186 is described below
commit 8c88cdb7186b1d594f991eb324356dcfcabdf18a
Author: Calvin Liu <[email protected]>
AuthorDate: Fri Mar 31 02:27:42 2023 -0700
KAFKA-14617: Update AlterPartitionRequest and enable Kraft controller to
reject stale request. (#13408)
Second part of the
[KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR),
it updates the AlterPartitionRequest:
- Deprecate the NewIsr field
- Create a new field BrokerState with BrokerId and BrokerEpoch
- Bump the AlterPartition version to 3
With this change, the Quorum Controller is enabled to reject stale
AlterPartition request.
Reviewers: Jun Rao <[email protected]>, David Jacot <[email protected]>
---
.../common/requests/AlterPartitionRequest.java | 21 +++
.../common/message/AlterPartitionRequest.json | 14 +-
.../common/message/AlterPartitionResponse.json | 4 +-
.../common/requests/AlterPartitionRequestTest.java | 76 +++++++++++
.../kafka/common/requests/RequestResponseTest.java | 2 +-
.../scala/kafka/controller/KafkaController.scala | 7 +-
.../scala/kafka/server/AlterPartitionManager.scala | 5 +-
.../controller/ControllerIntegrationTest.scala | 19 +--
.../kafka/server/AlterPartitionManagerTest.scala | 6 +-
.../kafka/controller/PartitionChangeBuilder.java | 9 ++
.../controller/ReplicationControlManager.java | 44 ++++--
.../controller/PartitionChangeBuilderTest.java | 38 ++++--
.../kafka/controller/QuorumControllerTest.java | 13 +-
.../controller/ReplicationControlManagerTest.java | 149 ++++++++++++++++-----
14 files changed, 326 insertions(+), 81 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
index 2d246f21041..2b150508292 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
@@ -18,12 +18,17 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
public class AlterPartitionRequest extends AbstractRequest {
@@ -77,6 +82,18 @@ public class AlterPartitionRequest extends AbstractRequest {
@Override
public AlterPartitionRequest build(short version) {
+ if (version < 3) {
+ data.topics().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ List<Integer> newIsr = new
ArrayList<>(partitionData.newIsrWithEpochs().size());
+ partitionData.newIsrWithEpochs().forEach(brokerState
-> {
+ newIsr.add(brokerState.brokerId());
+ });
+ partitionData.setNewIsr(newIsr);
+
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+ });
+ });
+ }
return new AlterPartitionRequest(data, version);
}
@@ -85,4 +102,8 @@ public class AlterPartitionRequest extends AbstractRequest {
return data.toString();
}
}
+
+ public static List<BrokerState>
newIsrToSimpleNewIsrWithBrokerEpochs(List<Integer> newIsr) {
+ return newIsr.stream().map(brokerId -> new
BrokerState().setBrokerId(brokerId)).collect(Collectors.toList());
+ }
}
diff --git
a/clients/src/main/resources/common/message/AlterPartitionRequest.json
b/clients/src/main/resources/common/message/AlterPartitionRequest.json
index d91f317f97d..2e880cd64fa 100644
--- a/clients/src/main/resources/common/message/AlterPartitionRequest.json
+++ b/clients/src/main/resources/common/message/AlterPartitionRequest.json
@@ -21,7 +21,9 @@
// Version 1 adds LeaderRecoveryState field (KIP-704).
//
// Version 2 adds TopicId field to replace TopicName field (KIP-841).
- "validVersions": "0-2",
+ //
+ // Version 3 adds the NewIsrEpochs field and deprecates the NewIsr field
(KIP-903).
+ "validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
@@ -38,8 +40,14 @@
"about": "The partition index" },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of this partition" },
- { "name": "NewIsr", "type": "[]int32", "versions": "0+", "entityType":
"brokerId",
- "about": "The ISR for this partition" },
+ { "name": "NewIsr", "type": "[]int32", "versions": "0-2",
"entityType": "brokerId",
+ "about": "The ISR for this partition. Deprecated since version 3." },
+ { "name": "NewIsrWithEpochs", "type": "[]BrokerState", "versions":
"3+", "fields": [
+ { "name": "BrokerId", "type": "int32", "versions": "3+",
"entityType": "brokerId",
+ "about": "The ID of the broker." },
+ { "name": "BrokerEpoch", "type": "int64", "versions": "3+",
"default": "-1",
+ "about": "The epoch of the broker. It will be -1 if the epoch
check is not supported." }
+ ]},
{ "name": "LeaderRecoveryState", "type": "int8", "versions": "1+",
"default": "0",
"about": "1 if the partition is recovering from an unclean leader
election; 0 otherwise." },
{ "name": "PartitionEpoch", "type": "int32", "versions": "0+",
diff --git
a/clients/src/main/resources/common/message/AlterPartitionResponse.json
b/clients/src/main/resources/common/message/AlterPartitionResponse.json
index e8be99fd5e3..2c1eb3d46fb 100644
--- a/clients/src/main/resources/common/message/AlterPartitionResponse.json
+++ b/clients/src/main/resources/common/message/AlterPartitionResponse.json
@@ -21,7 +21,9 @@
//
// Version 2 adds TopicId field to replace TopicName field, can return the
following new errors:
// INELIGIBLE_REPLICA, NEW_LEADER_ELECTED and UNKNOWN_TOPIC_ID (KIP-841).
- "validVersions": "0-2",
+ //
+ // Version 3 is the same as vesion 2 (KIP-903).
+ "validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
new file mode 100644
index 00000000000..5b0231ca882
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/requests/AlterPartitionRequestTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
+import org.apache.kafka.common.message.AlterPartitionRequestData.PartitionData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.TopicData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class AlterPartitionRequestTest {
+ String topic = "test-topic";
+ Uuid topicId = Uuid.randomUuid();
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ public void testBuildAlterPartitionRequest(short version) {
+ AlterPartitionRequestData request = new AlterPartitionRequestData()
+ .setBrokerId(1)
+ .setBrokerEpoch(1);
+
+ TopicData topicData = new TopicData()
+ .setTopicId(topicId)
+ .setTopicName(topic);
+
+ List<BrokerState> newIsrWithBrokerEpoch = new LinkedList<>();
+ newIsrWithBrokerEpoch.add(new
BrokerState().setBrokerId(1).setBrokerEpoch(1001));
+ newIsrWithBrokerEpoch.add(new
BrokerState().setBrokerId(2).setBrokerEpoch(1002));
+ newIsrWithBrokerEpoch.add(new
BrokerState().setBrokerId(3).setBrokerEpoch(1003));
+
+ topicData.partitions().add(new PartitionData()
+ .setPartitionIndex(0)
+ .setLeaderEpoch(1)
+ .setPartitionEpoch(10)
+ .setNewIsrWithEpochs(newIsrWithBrokerEpoch));
+
+ request.topics().add(topicData);
+
+ AlterPartitionRequest.Builder builder = new
AlterPartitionRequest.Builder(request, version > 1);
+ AlterPartitionRequest alterPartitionRequest = builder.build(version);
+ assertEquals(1, alterPartitionRequest.data().topics().size());
+ assertEquals(1,
alterPartitionRequest.data().topics().get(0).partitions().size());
+ PartitionData partitionData =
alterPartitionRequest.data().topics().get(0).partitions().get(0);
+ if (version < 3) {
+ assertEquals(Arrays.asList(1, 2, 3), partitionData.newIsr());
+ assertTrue(partitionData.newIsrWithEpochs().isEmpty());
+ } else {
+ assertEquals(newIsrWithBrokerEpoch,
partitionData.newIsrWithEpochs());
+ assertTrue(partitionData.newIsr().isEmpty());
+ }
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7b0ca0d233e..cb84ff94841 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1366,7 +1366,7 @@ public class RequestResponseTest {
.setPartitionIndex(1)
.setPartitionEpoch(2)
.setLeaderEpoch(3)
- .setNewIsr(asList(1, 2));
+
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(asList(1,
2)));
if (version >= 1) {
// Use the none default value; 1 - RECOVERING
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 64ae5ad39c1..1588066e4d7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -2324,12 +2324,17 @@ class KafkaController(val config: KafkaConfig,
case Some(topicName) =>
topicReq.partitions.forEach { partitionReq =>
+ val isr = if (alterPartitionRequestVersion >= 3) {
+ partitionReq.newIsrWithEpochs.asScala.toList.map(brokerState =>
brokerState.brokerId())
+ } else {
+ partitionReq.newIsr.asScala.toList.map(_.toInt)
+ }
partitionsToAlter.put(
new TopicPartition(topicName, partitionReq.partitionIndex),
LeaderAndIsr(
alterPartitionRequest.brokerId,
partitionReq.leaderEpoch,
- partitionReq.newIsr.asScala.toList.map(_.toInt),
+ isr,
LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
partitionReq.partitionEpoch
)
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 8d874df5183..8e228152fe8 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.OperationNotAttemptedException
import org.apache.kafka.common.message.AlterPartitionRequestData
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.RequestHeader
@@ -280,10 +281,12 @@ class DefaultAlterPartitionManager(
message.topics.add(topicData)
items.foreach { item =>
+ val isrWithEpoch = new
util.ArrayList[BrokerState](item.leaderAndIsr.isr.size)
+ item.leaderAndIsr.isr.foreach(brokerId => isrWithEpoch.add(new
BrokerState().setBrokerId(brokerId)))
val partitionData = new AlterPartitionRequestData.PartitionData()
.setPartitionIndex(item.topicIdPartition.partition)
.setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
- .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+ .setNewIsrWithEpochs(isrWithEpoch)
.setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
if (metadataVersion.isLeaderRecoverySupported) {
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index a195be37dbc..d5acc2f190a 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -30,6 +30,7 @@ import
org.apache.kafka.common.message.{AlterPartitionRequestData, AlterPartitio
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.AlterPartitionRequest
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
@@ -969,7 +970,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
// on IBP >= 2.8 and 2) the AlterPartition version 2 and above is used.
val canCallerUseTopicIds = metadataVersion.isTopicIdsSupported &&
alterPartitionVersion > 1
- val alterPartitionRequest = new AlterPartitionRequestData()
+ val alterPartitionRequest = new AlterPartitionRequest.Builder(new
AlterPartitionRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch)
.setTopics(Seq(new AlterPartitionRequestData.TopicData()
@@ -979,10 +980,10 @@ class ControllerIntegrationTest extends QuorumTestHarness
{
.setPartitionIndex(tp.partition)
.setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
.setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
- .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(newLeaderAndIsr.isr.map(Int.box).asJava))
.setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
).asJava)
- ).asJava)
+ ).asJava), alterPartitionVersion > 1).build(alterPartitionVersion).data()
val future = alterPartitionFuture(alterPartitionRequest,
alterPartitionVersion)
@@ -1038,7 +1039,7 @@ class ControllerIntegrationTest extends QuorumTestHarness
{
.setPartitionIndex(tp.partition)
.setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
.setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
- .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
+
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(newLeaderAndIsr.isr.map(Int.box).asJava))
.setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
).asJava)
).asJava)
@@ -1089,7 +1090,7 @@ class ControllerIntegrationTest extends QuorumTestHarness
{
.setPartitionIndex(tp.partition)
.setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
.setPartitionEpoch(requestPartitionEpoch)
- .setNewIsr(newIsr.map(Int.box).asJava)
+
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(newIsr.map(Int.box).asJava))
.setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
).asJava)
).asJava)
@@ -1151,15 +1152,15 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
.setPartitionIndex(tp.partition)
.setLeaderEpoch(leaderAndIsr.leaderEpoch)
.setPartitionEpoch(leaderAndIsr.partitionEpoch)
- .setNewIsr(fullIsr.map(Int.box).asJava)
+
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(fullIsr.map(Int.box).asJava))
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else
requestTopic.setTopicName(tp.topic)
// Try to update ISR to contain the offline broker.
- val alterPartitionRequest = new AlterPartitionRequestData()
+ val alterPartitionRequest = new AlterPartitionRequest.Builder(new
AlterPartitionRequestData()
.setBrokerId(controllerId)
.setBrokerEpoch(controllerEpoch)
- .setTopics(Seq(requestTopic).asJava)
+ .setTopics(Seq(requestTopic).asJava), alterPartitionVersion >
1).build(alterPartitionVersion).data()
val future = alterPartitionFuture(alterPartitionRequest,
alterPartitionVersion)
@@ -1484,7 +1485,7 @@ class ControllerIntegrationTest extends QuorumTestHarness
{
.setPartitionIndex(topicPartition.partition)
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(partitionEpoch)
- .setNewIsr(isr.toList.map(Int.box).asJava)
+
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(isr.toList.map(Int.box).asJava))
.setLeaderRecoveryState(leaderRecoveryState)).asJava)).asJava)
val future = alterPartitionFuture(alterPartitionRequest, if
(topicIdOpt.isDefined) AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else
1)
diff --git
a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index 1ceabb4ad21..0560b2c776f 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -139,7 +139,11 @@ class AlterPartitionManagerTest {
// Make sure we sent the right request ISR={1}
val request = capture.getValue.build()
assertEquals(request.data().topics().size(), 1)
-
assertEquals(request.data().topics().get(0).partitions().get(0).newIsr().size(),
1)
+ if (request.version() < 3) {
+ assertEquals(request.data.topics.get(0).partitions.get(0).newIsr.size, 1)
+ } else {
+
assertEquals(request.data.topics.get(0).partitions.get(0).newIsrWithEpochs.size,
1)
+ }
}
@ParameterizedTest
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 2af7f7d277d..9c630dfc697 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -21,7 +21,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.IntPredicate;
+import java.util.stream.Collectors;
+
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
@@ -100,6 +103,12 @@ public class PartitionChangeBuilder {
return this;
}
+ public PartitionChangeBuilder
setTargetIsrWithBrokerStates(List<BrokerState> targetIsrWithEpoch) {
+ this.targetIsr = targetIsrWithEpoch.stream()
+ .map(brokerState ->
brokerState.brokerId()).collect(Collectors.toList());
+ return this;
+ }
+
public PartitionChangeBuilder setTargetReplicas(List<Integer>
targetReplicas) {
this.targetReplicas = targetReplicas;
return this;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 8b1b2fe0bba..d627c7b5cd6 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
@@ -74,6 +75,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
@@ -956,6 +958,12 @@ public class ReplicationControlManager {
TopicControlInfo topic = topics.get(topicId);
for (AlterPartitionRequestData.PartitionData partitionData :
topicData.partitions()) {
+ if (requestVersion < 3) {
+ partitionData.setNewIsrWithEpochs(
+
AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(partitionData.newIsr())
+ );
+ }
+
int partitionId = partitionData.partitionIndex();
PartitionRegistration partition = topic.parts.get(partitionId);
@@ -986,7 +994,7 @@ public class ReplicationControlManager {
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
- builder.setTargetIsr(partitionData.newIsr());
+
builder.setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs());
builder.setTargetLeaderRecoveryState(
LeaderRecoveryState.of(partitionData.leaderRecoveryState()));
Optional<ApiMessageAndVersion> record = builder.build();
@@ -1110,11 +1118,14 @@ public class ReplicationControlManager {
return INVALID_UPDATE_VERSION;
}
- int[] newIsr = Replicas.toArray(partitionData.newIsr());
+
+ int[] newIsr = partitionData.newIsrWithEpochs().stream()
+ .mapToInt(brokerState -> brokerState.brokerId()).toArray();
+
if (!Replicas.validateIsr(partition.replicas, newIsr)) {
log.error("Rejecting AlterPartition request from node {} for {}-{}
because " +
"it specified an invalid ISR {}.", brokerId,
- topic.name, partitionId, partitionData.newIsr());
+ topic.name, partitionId, partitionData.newIsrWithEpochs());
return INVALID_REQUEST;
}
@@ -1122,7 +1133,7 @@ public class ReplicationControlManager {
// The ISR must always include the current leader.
log.error("Rejecting AlterPartition request from node {} for {}-{}
because " +
"it specified an invalid ISR {} that doesn't include
itself.",
- brokerId, topic.name, partitionId, partitionData.newIsr());
+ brokerId, topic.name, partitionId,
partitionData.newIsrWithEpochs());
return INVALID_REQUEST;
}
@@ -1131,7 +1142,7 @@ public class ReplicationControlManager {
log.info("Rejecting AlterPartition request from node {} for {}-{}
because " +
"the ISR {} had more than one replica while the leader was
still " +
"recovering from an unclean leader election {}.",
- brokerId, topic.name, partitionId, partitionData.newIsr(),
+ brokerId, topic.name, partitionId,
partitionData.newIsrWithEpochs(),
leaderRecoveryState);
return INVALID_REQUEST;
@@ -1145,11 +1156,11 @@ public class ReplicationControlManager {
return INVALID_REQUEST;
}
- List<IneligibleReplica> ineligibleReplicas =
ineligibleReplicasForIsr(newIsr);
+ List<IneligibleReplica> ineligibleReplicas =
ineligibleReplicasForIsr(partitionData.newIsrWithEpochs());
if (!ineligibleReplicas.isEmpty()) {
log.info("Rejecting AlterPartition request from node {} for {}-{}
because " +
"it specified ineligible replicas {} in the new ISR {}.",
- brokerId, topic.name, partitionId, ineligibleReplicas,
partitionData.newIsr());
+ brokerId, topic.name, partitionId, ineligibleReplicas,
partitionData.newIsrWithEpochs());
if (requestApiVersion > 1) {
return INELIGIBLE_REPLICA;
@@ -1161,16 +1172,23 @@ public class ReplicationControlManager {
return Errors.NONE;
}
- private List<IneligibleReplica> ineligibleReplicasForIsr(int[] replicas) {
+ private List<IneligibleReplica> ineligibleReplicasForIsr(List<BrokerState>
brokerStates) {
List<IneligibleReplica> ineligibleReplicas = new ArrayList<>(0);
- for (Integer replicaId : replicas) {
- BrokerRegistration registration =
clusterControl.registration(replicaId);
+ for (BrokerState brokerState : brokerStates) {
+ int brokerId = brokerState.brokerId();
+ BrokerRegistration registration =
clusterControl.registration(brokerId);
if (registration == null) {
- ineligibleReplicas.add(new IneligibleReplica(replicaId, "not
registered"));
+ ineligibleReplicas.add(new IneligibleReplica(brokerId, "not
registered"));
} else if (registration.inControlledShutdown()) {
- ineligibleReplicas.add(new IneligibleReplica(replicaId,
"shutting down"));
+ ineligibleReplicas.add(new IneligibleReplica(brokerId,
"shutting down"));
} else if (registration.fenced()) {
- ineligibleReplicas.add(new IneligibleReplica(replicaId,
"fenced"));
+ ineligibleReplicas.add(new IneligibleReplica(brokerId,
"fenced"));
+ } else if (brokerState.brokerEpoch() != -1 && registration.epoch()
!= brokerState.brokerEpoch()) {
+ // The given broker epoch should match with the broker epoch
in the broker registration, except the
+ // given broker epoch is -1 which means skipping the broker
epoch verification.
+ ineligibleReplicas.add(new IneligibleReplica(brokerId,
+ "broker epoch mismatch: requested=" +
brokerState.brokerEpoch()
+ + " VS expected=" + registration.epoch()));
}
}
return ineligibleReplicas;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index e3063a74fc7..bc505979274 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.protocol.types.TaggedFields;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.controller.PartitionChangeBuilder.ElectionResult;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
@@ -128,12 +129,17 @@ public class PartitionChangeBuilderTest {
assertElectLeaderEquals(createFooBuilder().setElection(Election.PREFERRED), 2,
false);
assertElectLeaderEquals(createFooBuilder(), 1, false);
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN), 1,
false);
-
assertElectLeaderEquals(createFooBuilder().setTargetIsr(Arrays.asList(1, 3)),
1, false);
-
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(1,
3)), 1, false);
-
assertElectLeaderEquals(createFooBuilder().setTargetIsr(Arrays.asList(3)),
NO_LEADER, false);
-
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(3)),
2, true);
+ assertElectLeaderEquals(createFooBuilder()
+
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1,
3))), 1, false);
+
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN)
+
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1,
3))), 1, false);
+ assertElectLeaderEquals(createFooBuilder()
+
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))),
NO_LEADER, false);
+
assertElectLeaderEquals(createFooBuilder().setElection(Election.UNCLEAN).
+
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))),
2, true);
assertElectLeaderEquals(
-
createFooBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2,
1, 3, 4)),
+ createFooBuilder().setElection(Election.UNCLEAN)
+
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(4))).setTargetReplicas(Arrays.asList(2,
1, 3, 4)),
4,
false
);
@@ -155,9 +161,11 @@ public class PartitionChangeBuilderTest {
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(),
new PartitionChangeRecord(), NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
- setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1);
+
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2,
1))),
+ new PartitionChangeRecord(), 1);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
- setTargetIsr(Arrays.asList(2, 1, 3, 4)), new
PartitionChangeRecord(),
+
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2,
1, 3, 4))),
+ new PartitionChangeRecord(),
NO_LEADER_CHANGE);
testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder().
setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new
PartitionChangeRecord(),
@@ -183,7 +191,7 @@ public class PartitionChangeBuilderTest {
setPartitionId(0).
setIsr(Arrays.asList(2, 1)).
setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createFooBuilder().setTargetIsr(Arrays.asList(2, 1)).build());
+
createFooBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2,
1))).build());
}
@Test
@@ -193,7 +201,7 @@ public class PartitionChangeBuilderTest {
setPartitionId(0).
setIsr(Arrays.asList(2, 3)).
setLeader(2),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createFooBuilder().setTargetIsr(Arrays.asList(2, 3)).build());
+
createFooBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2,
3))).build());
}
@Test
@@ -217,7 +225,7 @@ public class PartitionChangeBuilderTest {
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
- createBarBuilder().setTargetIsr(Arrays.asList(1, 2, 3,
4)).build());
+
createBarBuilder().setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1,
2, 3, 4))).build());
}
@Test
@@ -235,7 +243,7 @@ public class PartitionChangeBuilderTest {
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
createBarBuilder().
setTargetReplicas(revert.replicas()).
- setTargetIsr(revert.isr()).
+
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(revert.isr())).
setTargetRemoving(Collections.emptyList()).
setTargetAdding(Collections.emptyList()).
build());
@@ -293,7 +301,8 @@ public class PartitionChangeBuilderTest {
);
assertEquals(
Optional.of(expectedRecord),
-
createFooBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(3)).build()
+ createFooBuilder().setElection(Election.UNCLEAN)
+
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))).build()
);
expectedRecord = new ApiMessageAndVersion(
@@ -311,7 +320,8 @@ public class PartitionChangeBuilderTest {
);
assertEquals(
Optional.of(expectedRecord),
-
createOfflineBuilder().setElection(Election.UNCLEAN).setTargetIsr(Arrays.asList(2)).build()
+ createOfflineBuilder().setElection(Election.UNCLEAN)
+
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2))).build()
);
}
@@ -341,7 +351,7 @@ public class PartitionChangeBuilderTest {
isLeaderRecoverySupported
);
// Set the target ISR to empty to indicate that the last leader is
offline
- offlineBuilder.setTargetIsr(Collections.emptyList());
+ offlineBuilder.setTargetIsrWithBrokerStates(Collections.emptyList());
// The partition should stay as recovering
PartitionChangeRecord changeRecord = (PartitionChangeRecord)
offlineBuilder
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 4646c0d217d..6fffcddf0a0 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -343,7 +344,6 @@ public class QuorumControllerTest {
}
}
- @Test
public void testBalancePartitionLeaders() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
List<Integer> brokersToKeepUnfenced = Arrays.asList(1, 2);
@@ -455,7 +455,7 @@ public class QuorumControllerTest {
.setPartitionIndex(imbalancedPartitionId)
.setLeaderEpoch(partitionRegistration.leaderEpoch)
.setPartitionEpoch(partitionRegistration.partitionEpoch)
- .setNewIsr(Arrays.asList(1, 2, 3));
+
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1,
2, 3)));
AlterPartitionRequestData.TopicData topicData = new
AlterPartitionRequestData.TopicData()
.setTopicName("foo");
@@ -466,7 +466,8 @@ public class QuorumControllerTest {
.setBrokerEpoch(brokerEpochs.get(partitionRegistration.leader));
alterPartitionRequest.topics().add(topicData);
- active.alterPartition(ANONYMOUS_CONTEXT,
alterPartitionRequest).get();
+ active.alterPartition(ANONYMOUS_CONTEXT, new AlterPartitionRequest
+ .Builder(alterPartitionRequest, false).build((short)
0).data()).get();
// Check that partitions are balanced
AtomicLong lastHeartbeat = new
AtomicLong(active.time().milliseconds());
@@ -861,7 +862,6 @@ public class QuorumControllerTest {
}
@Disabled // TODO: need to fix leader election in LocalLog.
- @Test
public void testMissingInMemorySnapshot() throws Exception {
int numBrokers = 3;
int numPartitions = 3;
@@ -911,7 +911,7 @@ public class QuorumControllerTest {
.setPartitionIndex(partitionIndex)
.setLeaderEpoch(partitionRegistration.leaderEpoch)
.setPartitionEpoch(partitionRegistration.partitionEpoch)
- .setNewIsr(Arrays.asList(0, 1));
+
.setNewIsrWithEpochs(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(0,
1)));
})
.collect(Collectors.toList());
@@ -929,7 +929,8 @@ public class QuorumControllerTest {
int oldClaimEpoch = controller.curClaimEpoch();
assertThrows(ExecutionException.class,
- () -> controller.alterPartition(ANONYMOUS_CONTEXT,
alterPartitionRequest).get());
+ () -> controller.alterPartition(ANONYMOUS_CONTEXT, new
AlterPartitionRequest
+ .Builder(alterPartitionRequest, false).build((short)
0).data()).get());
// Wait for the controller to become active again
assertSame(controller, controlEnv.activeController());
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 6aa4fe508db..b8b42609959 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.AlterPartitionRequestData;
+import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.message.AlterPartitionRequestData.PartitionData;
import org.apache.kafka.common.message.AlterPartitionRequestData.TopicData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
@@ -64,6 +65,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
@@ -306,7 +308,7 @@ public class ReplicationControlManagerTest {
void registerBrokers(Integer... brokerIds) throws Exception {
for (int brokerId : brokerIds) {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
- setBrokerEpoch(brokerId +
100).setBrokerId(brokerId).setRack(null);
+
setBrokerEpoch(defaultBrokerEpoch(brokerId)).setBrokerId(brokerId).setRack(null);
brokerRecord.endPoints().add(new
RegisterBrokerRecord.BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092 + brokerId).
@@ -319,7 +321,7 @@ public class ReplicationControlManagerTest {
void alterPartition(
TopicIdPartition topicIdPartition,
int leaderId,
- List<Integer> isr,
+ List<BrokerState> isrWithEpoch,
LeaderRecoveryState leaderRecoveryState
) throws Exception {
BrokerRegistration registration =
clusterControl.brokerRegistrations().get(leaderId);
@@ -337,7 +339,7 @@ public class ReplicationControlManagerTest {
.setPartitionEpoch(partition.partitionEpoch)
.setLeaderEpoch(partition.leaderEpoch)
.setLeaderRecoveryState(leaderRecoveryState.value())
- .setNewIsr(isr);
+ .setNewIsrWithEpochs(isrWithEpoch);
String topicName =
replicationControl.getTopic(topicIdPartition.topicId()).name();
TopicData topicData = new TopicData()
@@ -364,7 +366,7 @@ public class ReplicationControlManagerTest {
for (int brokerId : brokerIds) {
ControllerResult<BrokerHeartbeatReply> result =
replicationControl.
processBrokerHeartbeat(new BrokerHeartbeatRequestData().
- setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).
+
setBrokerId(brokerId).setBrokerEpoch(defaultBrokerEpoch(brokerId)).
setCurrentMetadataOffset(1).
setWantFence(false).setWantShutDown(false), 0);
assertEquals(new BrokerHeartbeatReply(true, false, false,
false),
@@ -381,7 +383,7 @@ public class ReplicationControlManagerTest {
for (int brokerId : brokerIds) {
BrokerRegistrationChangeRecord record = new
BrokerRegistrationChangeRecord()
.setBrokerId(brokerId)
- .setBrokerEpoch(brokerId + 100)
+ .setBrokerEpoch(defaultBrokerEpoch(brokerId))
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
replay(singletonList(new ApiMessageAndVersion(record, (short)
1)));
}
@@ -842,7 +844,7 @@ public class ReplicationControlManagerTest {
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
PartitionData shrinkIsrRequest = newAlterPartition(
- replicationControl, topicIdPartition, asList(0, 1),
LeaderRecoveryState.RECOVERED);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1),
LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> shrinkIsrResult =
sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(),
shrinkIsrRequest);
AlterPartitionResponseData.PartitionData shrinkIsrResponse =
assertAlterPartitionResponse(
@@ -850,7 +852,7 @@ public class ReplicationControlManagerTest {
assertConsistentAlterPartitionResponse(replicationControl,
topicIdPartition, shrinkIsrResponse);
PartitionData expandIsrRequest = newAlterPartition(
- replicationControl, topicIdPartition, asList(0, 1, 2),
LeaderRecoveryState.RECOVERED);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1,
2), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> expandIsrResult =
sendAlterPartition(
replicationControl, 0, brokerEpoch, topicIdPartition.topicId(),
expandIsrRequest);
AlterPartitionResponseData.PartitionData expandIsrResponse =
assertAlterPartitionResponse(
@@ -913,7 +915,7 @@ public class ReplicationControlManagerTest {
// Invalid leader
PartitionData invalidLeaderRequest = newAlterPartition(
- replicationControl, topicIdPartition, asList(0, 1),
LeaderRecoveryState.RECOVERED);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1),
LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> invalidLeaderResult =
sendAlterPartition(
replicationControl, notLeaderId,
ctx.currentBrokerEpoch(notLeaderId),
topicIdPartition.topicId(), invalidLeaderRequest);
@@ -921,13 +923,13 @@ public class ReplicationControlManagerTest {
// Stale broker epoch
PartitionData invalidBrokerEpochRequest = newAlterPartition(
- replicationControl, topicIdPartition, asList(0, 1),
LeaderRecoveryState.RECOVERED);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1),
LeaderRecoveryState.RECOVERED);
assertThrows(StaleBrokerEpochException.class, () -> sendAlterPartition(
replicationControl, leaderId, brokerEpoch - 1,
topicIdPartition.topicId(), invalidBrokerEpochRequest));
// Invalid leader epoch
PartitionData invalidLeaderEpochRequest = newAlterPartition(
- replicationControl, topicIdPartition, asList(0, 1),
LeaderRecoveryState.RECOVERED);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1),
LeaderRecoveryState.RECOVERED);
invalidLeaderEpochRequest.setLeaderEpoch(500);
ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult
= sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
@@ -936,7 +938,7 @@ public class ReplicationControlManagerTest {
// Invalid partition epoch
PartitionData invalidPartitionEpochRequest = newAlterPartition(
- replicationControl, topicIdPartition, asList(0, 1),
LeaderRecoveryState.RECOVERED);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1),
LeaderRecoveryState.RECOVERED);
invalidPartitionEpochRequest.setPartitionEpoch(500);
ControllerResult<AlterPartitionResponseData>
invalidPartitionEpochResult = sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
@@ -945,7 +947,7 @@ public class ReplicationControlManagerTest {
// Invalid ISR (3 is not a valid replica)
PartitionData invalidIsrRequest1 = newAlterPartition(
- replicationControl, topicIdPartition, asList(0, 1, 3),
LeaderRecoveryState.RECOVERED);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1,
3), LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> invalidIsrResult1 =
sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidIsrRequest1);
@@ -953,7 +955,7 @@ public class ReplicationControlManagerTest {
// Invalid ISR (does not include leader 0)
PartitionData invalidIsrRequest2 = newAlterPartition(
- replicationControl, topicIdPartition, asList(1, 2),
LeaderRecoveryState.RECOVERED);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(1, 2),
LeaderRecoveryState.RECOVERED);
ControllerResult<AlterPartitionResponseData> invalidIsrResult2 =
sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidIsrRequest2);
@@ -961,7 +963,7 @@ public class ReplicationControlManagerTest {
// Invalid ISR length and recovery state
PartitionData invalidIsrRecoveryRequest = newAlterPartition(
- replicationControl, topicIdPartition, asList(0, 1),
LeaderRecoveryState.RECOVERING);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1),
LeaderRecoveryState.RECOVERING);
ControllerResult<AlterPartitionResponseData> invalidIsrRecoveryResult
= sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidIsrRecoveryRequest);
@@ -969,7 +971,7 @@ public class ReplicationControlManagerTest {
// Invalid recovery state transition from RECOVERED to RECOVERING
PartitionData invalidRecoveryRequest = newAlterPartition(
- replicationControl, topicIdPartition, asList(0),
LeaderRecoveryState.RECOVERING);
+ replicationControl, topicIdPartition, isrWithDefaultEpoch(0),
LeaderRecoveryState.RECOVERING);
ControllerResult<AlterPartitionResponseData> invalidRecoveryResult =
sendAlterPartition(
replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
topicIdPartition.topicId(), invalidRecoveryRequest);
@@ -979,7 +981,7 @@ public class ReplicationControlManagerTest {
private PartitionData newAlterPartition(
ReplicationControlManager replicationControl,
TopicIdPartition topicIdPartition,
- List<Integer> newIsr,
+ List<BrokerState> newIsrWithEpoch,
LeaderRecoveryState leaderRecoveryState
) {
PartitionRegistration partitionControl =
@@ -988,7 +990,7 @@ public class ReplicationControlManagerTest {
.setPartitionIndex(0)
.setLeaderEpoch(partitionControl.leaderEpoch)
.setPartitionEpoch(partitionControl.partitionEpoch)
- .setNewIsr(newIsr)
+ .setNewIsrWithEpochs(newIsrWithEpoch)
.setLeaderRecoveryState(leaderRecoveryState.value());
}
@@ -1501,9 +1503,9 @@ public class ReplicationControlManagerTest {
log.info("running final alterPartition...");
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
- ControllerResult<AlterPartitionResponseData> alterPartitionResult =
replication.alterPartition(
- requestContext,
- new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103).
+ AlterPartitionRequestData alterPartitionRequestData = new
AlterPartitionRequestData().
+ setBrokerId(3).
+ setBrokerEpoch(103).
setTopics(asList(new TopicData().
setTopicName(version <= 1 ? "foo" : "").
setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
@@ -1511,7 +1513,10 @@ public class ReplicationControlManagerTest {
setPartitionIndex(1).
setPartitionEpoch(1).
setLeaderEpoch(0).
- setNewIsr(asList(3, 0, 2, 1)))))));
+ setNewIsrWithEpochs(isrWithDefaultEpoch(3, 0, 2,
1))))));
+ ControllerResult<AlterPartitionResponseData> alterPartitionResult =
replication.alterPartition(
+ requestContext,
+ new AlterPartitionRequest.Builder(alterPartitionRequestData,
version > 1).build(version).data());
Errors expectedError = version > 1 ? NEW_LEADER_ELECTED :
FENCED_LEADER_EPOCH;
assertEquals(new AlterPartitionResponseData().setTopics(asList(
new AlterPartitionResponseData.TopicData().
@@ -1564,13 +1569,13 @@ public class ReplicationControlManagerTest {
.setPartitionIndex(0)
.setPartitionEpoch(1)
.setLeaderEpoch(1)
- .setNewIsr(asList(1, 2, 3, 4))))));
+ .setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
ControllerResult<AlterPartitionResponseData> alterPartitionResult =
- replication.alterPartition(requestContext, alterIsrRequest);
+ replication.alterPartition(requestContext, new
AlterPartitionRequest.Builder(alterIsrRequest, version >
1).build(version).data());
Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED :
INELIGIBLE_REPLICA;
assertEquals(
@@ -1604,6 +1609,75 @@ public class ReplicationControlManagerTest {
alterPartitionResult.response());
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ public void testAlterPartitionShouldRejectBrokersWithStaleEpoch(short
version) throws Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3, 4);
+ ctx.unfenceBrokers(0, 1, 2, 3, 4);
+ Uuid fooId = ctx.createTestTopic(
+ "foo",
+ new int[][] {new int[] {1, 2, 3, 4}}
+ ).topicId();
+ ctx.alterPartition(new TopicIdPartition(fooId, 0), 1,
isrWithDefaultEpoch(1, 2, 3), LeaderRecoveryState.RECOVERED);
+
+ // First, the leader is constructing an AlterPartition request.
+ AlterPartitionRequestData alterIsrRequest = new
AlterPartitionRequestData().
+ setBrokerId(1).
+ setBrokerEpoch(101).
+ setTopics(asList(new TopicData().
+ setTopicName(version <= 1 ? "foo" : "").
+ setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
+ setPartitions(asList(new PartitionData().
+ setPartitionIndex(0).
+ setPartitionEpoch(1).
+ setLeaderEpoch(1).
+ setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
+
+ // The broker 4 has failed silently and now registers again.
+ long newEpoch = defaultBrokerEpoch(4) + 1000;
+ RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
+ setBrokerEpoch(newEpoch).setBrokerId(4).setRack(null);
+ brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
+ setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+ setPort((short) 9092 + 4).
+ setName("PLAINTEXT").
+ setHost("localhost"));
+ ctx.replay(Collections.singletonList(new
ApiMessageAndVersion(brokerRecord, (short) 0)));
+
+ // Unfence the broker 4.
+ ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
+ processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+ setBrokerId(4).setBrokerEpoch(newEpoch).
+ setCurrentMetadataOffset(1).
+ setWantFence(false).setWantShutDown(false), 0);
+ assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+ result.response());
+ ctx.replay(result.records());
+
+ ControllerRequestContext requestContext =
+ anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
+
+ ControllerResult<AlterPartitionResponseData> alterPartitionResult =
+ replication.alterPartition(requestContext, new
AlterPartitionRequest.Builder(alterIsrRequest, version >
1).build(version).data());
+
+ // The late arrived AlterPartition request should be rejected when
version >= 3.
+ if (version >= 3) {
+ assertEquals(
+ new AlterPartitionResponseData().
+ setTopics(asList(new
AlterPartitionResponseData.TopicData().
+ setTopicName(version <= 1 ? "foo" : "").
+ setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
+ setPartitions(asList(new
AlterPartitionResponseData.PartitionData().
+ setPartitionIndex(0).
+ setErrorCode(INELIGIBLE_REPLICA.code()))))),
+ alterPartitionResult.response());
+ } else {
+ assertEquals(NONE.code(),
alterPartitionResult.response().errorCode());
+ }
+ }
+
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
public void testAlterPartitionShouldRejectShuttingDownBrokers(short
version) throws Exception {
@@ -1640,13 +1714,13 @@ public class ReplicationControlManagerTest {
.setPartitionIndex(0)
.setPartitionEpoch(0)
.setLeaderEpoch(0)
- .setNewIsr(asList(1, 2, 3, 4))))));
+ .setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3, 4))))));
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
ControllerResult<AlterPartitionResponseData> alterPartitionResult =
- replication.alterPartition(requestContext, alterIsrRequest);
+ replication.alterPartition(requestContext, new
AlterPartitionRequest.Builder(alterIsrRequest, version >
1).build(version).data());
Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED :
INELIGIBLE_REPLICA;
assertEquals(
@@ -1738,7 +1812,7 @@ public class ReplicationControlManagerTest {
new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104).
setTopics(asList(new
TopicData().setTopicId(barId).setPartitions(asList(
new
PartitionData().setPartitionIndex(0).setPartitionEpoch(2).
- setLeaderEpoch(1).setNewIsr(asList(4, 1, 2, 0)))))));
+
setLeaderEpoch(1).setNewIsrWithEpochs(isrWithDefaultEpoch(4, 1, 2, 0)))))));
assertEquals(new AlterPartitionResponseData().setTopics(asList(
new
AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(asList(
new AlterPartitionResponseData.PartitionData().
@@ -1867,7 +1941,7 @@ public class ReplicationControlManagerTest {
// Bring 2 back into the ISR for partition 1. This allows us to verify
that
// preferred election does not occur as a result of the unclean
election request.
- ctx.alterPartition(partition1, 4, asList(2, 4),
LeaderRecoveryState.RECOVERED);
+ ctx.alterPartition(partition1, 4, isrWithDefaultEpoch(2, 4),
LeaderRecoveryState.RECOVERED);
ControllerResult<ElectLeadersResponseData> result =
replication.electLeaders(request);
assertEquals(1, result.records().size());
@@ -2039,10 +2113,10 @@ public class ReplicationControlManagerTest {
setPartitions(asList(
new AlterPartitionRequestData.PartitionData().
setPartitionIndex(0).setPartitionEpoch(0).
- setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)),
+
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3)),
new AlterPartitionRequestData.PartitionData().
setPartitionIndex(2).setPartitionEpoch(0).
- setLeaderEpoch(0).setNewIsr(asList(0, 2, 1)))))));
+
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(0, 2, 1)))))));
assertEquals(new AlterPartitionResponseData().setTopics(asList(
new
AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
new AlterPartitionResponseData.PartitionData().
@@ -2124,7 +2198,7 @@ public class ReplicationControlManagerTest {
setTopics(asList(new
AlterPartitionRequestData.TopicData().setTopicId(fooId).
setPartitions(asList(new
AlterPartitionRequestData.PartitionData().
setPartitionIndex(0).setPartitionEpoch(0).
- setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)))))));
+
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3)))))));
assertEquals(new AlterPartitionResponseData().setTopics(asList(
new
AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
new AlterPartitionResponseData.PartitionData().
@@ -2156,7 +2230,7 @@ public class ReplicationControlManagerTest {
setTopics(asList(new
AlterPartitionRequestData.TopicData().setTopicId(fooId).
setPartitions(asList(new
AlterPartitionRequestData.PartitionData().
setPartitionIndex(2).setPartitionEpoch(0).
- setLeaderEpoch(0).setNewIsr(asList(0, 2, 1)))))));
+
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(0, 2, 1)))))));
assertEquals(new AlterPartitionResponseData().setTopics(asList(
new
AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
new AlterPartitionResponseData.PartitionData().
@@ -2292,4 +2366,17 @@ public class ReplicationControlManagerTest {
assertEquals(expectedRecords, result.records());
}
+
+ private static BrokerState brokerState(int brokerId, Long brokerEpoch) {
+ return new
BrokerState().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
+ }
+
+ private static Long defaultBrokerEpoch(int brokerId) {
+ return brokerId + 100L;
+ }
+
+ private static List<BrokerState> isrWithDefaultEpoch(Integer... isr) {
+ return Arrays.stream(isr).map(brokerId -> brokerState(brokerId,
defaultBrokerEpoch(brokerId)))
+ .collect(Collectors.toList());
+ }
}