This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 301a07f KAFKA-13083: Fix KRaft ISR in createPartitions, createTopics
301a07f is described below
commit 301a07f4d877ebd22e111831d5445ade3f73af14
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Jul 14 10:52:25 2021 -0700
KAFKA-13083: Fix KRaft ISR in createPartitions, createTopics
When creating a new topic or partition via a manual partition assignment, we
must check if the nodes are unfenced before adding them to the new ISR.
We also need to reject attempts to create a new partition with all fenced
nodes.
Reviewers: David Arthur <[email protected]>
---
.../controller/ReplicationControlManager.java | 34 ++-
.../kafka/controller/QuorumControllerTest.java | 29 +--
.../controller/ReplicationControlManagerTest.java | 232 +++++++++++++++------
3 files changed, 205 insertions(+), 90 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index db8d86a..1aa133e 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -84,6 +84,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.OptionalInt;
+import java.util.stream.Collectors;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@@ -376,9 +377,16 @@ public class ReplicationControlManager {
}
validateManualPartitionAssignment(assignment.brokerIds(),
replicationFactor);
replicationFactor =
OptionalInt.of(assignment.brokerIds().size());
- int[] replicas = Replicas.toArray(assignment.brokerIds());
+ List<Integer> isr = assignment.brokerIds().stream().
+
filter(clusterControl::unfenced).collect(Collectors.toList());
+ if (isr.isEmpty()) {
+ return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+ "All brokers specified in the manual partition
assignment for " +
+ "partition " + assignment.partitionIndex() + " are
fenced.");
+ }
newParts.put(assignment.partitionIndex(), new
PartitionRegistration(
- replicas, replicas, Replicas.NONE, Replicas.NONE,
replicas[0], 0, 0));
+ Replicas.toArray(assignment.brokerIds()),
Replicas.toArray(isr),
+ Replicas.NONE, Replicas.NONE, isr.get(0), 0, 0));
}
} else if (topic.replicationFactor() < -1 || topic.replicationFactor()
== 0) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@@ -910,27 +918,41 @@ public class ReplicationControlManager {
int startPartitionId = topicInfo.parts.size();
List<List<Integer>> placements;
+ List<List<Integer>> isrs;
if (topic.assignments() != null) {
placements = new ArrayList<>();
- for (CreatePartitionsAssignment assignment : topic.assignments()) {
+ isrs = new ArrayList<>();
+ for (int i = 0; i < topic.assignments().size(); i++) {
+ CreatePartitionsAssignment assignment =
topic.assignments().get(i);
validateManualPartitionAssignment(assignment.brokerIds(),
OptionalInt.of(replicationFactor));
placements.add(assignment.brokerIds());
+ List<Integer> isr = assignment.brokerIds().stream().
+
filter(clusterControl::unfenced).collect(Collectors.toList());
+ if (isr.isEmpty()) {
+ throw new InvalidReplicaAssignmentException(
+ "All brokers specified in the manual partition
assignment for " +
+ "partition " + (startPartitionId + i) + " are
fenced.");
+ }
+ isrs.add(isr);
}
} else {
placements = clusterControl.placeReplicas(startPartitionId,
additional,
replicationFactor);
+ isrs = placements;
}
int partitionId = startPartitionId;
- for (List<Integer> placement : placements) {
+ for (int i = 0; i < placements.size(); i++) {
+ List<Integer> placement = placements.get(i);
+ List<Integer> isr = isrs.get(i);
records.add(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(partitionId).
setTopicId(topicId).
setReplicas(placement).
- setIsr(placement).
+ setIsr(isr).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
- setLeader(placement.get(0)).
+ setLeader(isr.get(0)).
setLeaderEpoch(0).
setPartitionEpoch(0),
PARTITION_RECORD.highestSupportedVersion()));
partitionId++;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 38a368d..911f955 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -347,18 +347,12 @@ public class QuorumControllerTest {
public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable {
final int numBrokers = 4;
final int maxNewRecordBytes = 1000;
- int appendedBytes = 0;
Map<Integer, Long> brokerEpochs = new HashMap<>();
- Uuid fooId;
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3,
Optional.empty())) {
try (QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv(logEnv,
- builder -> {
- builder
- .setConfigDefs(CONFIGS)
- .setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
- })
+ builder -> builder.setConfigDefs(CONFIGS).
+ setSnapshotMaxNewRecordBytes(maxNewRecordBytes))
) {
-
QuorumController active = controlEnv.activeController();
for (int i = 0; i < numBrokers; i++) {
BrokerRegistrationReply reply = active.registerBroker(
@@ -371,25 +365,23 @@ public class QuorumControllerTest {
setName("PLAINTEXT").setHost("localhost").
setPort(9092 + i)).iterator()))).get();
brokerEpochs.put(i, reply.epoch());
+ assertEquals(new BrokerHeartbeatReply(true, false, false,
false),
+ active.processBrokerHeartbeat(new
BrokerHeartbeatRequestData().
+
setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
+
setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
}
- assertTrue(
- logEnv.appendedBytes() < maxNewRecordBytes,
- String.format(
- "%s appended bytes is not less than %s max new record
bytes",
+ assertTrue(logEnv.appendedBytes() < maxNewRecordBytes,
+ String.format("%s appended bytes is not less than %s max
new record bytes",
logEnv.appendedBytes(),
- maxNewRecordBytes
- )
- );
+ maxNewRecordBytes));
// Keep creating topic until we reached the max bytes limit
int counter = 0;
while (logEnv.appendedBytes() < maxNewRecordBytes) {
counter += 1;
String topicName = String.format("foo-%s", counter);
-
- CreateTopicsResponseData reply = active.createTopics(
- new CreateTopicsRequestData().setTopics(
+ active.createTopics(new
CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
new
CreatableTopic().setName(topicName).setNumPartitions(-1).
setReplicationFactor((short) -1).
@@ -402,7 +394,6 @@ public class QuorumControllerTest {
setBrokerIds(Arrays.asList(1, 2,
0))).
iterator()))).iterator()))).get();
}
-
logEnv.waitForLatestSnapshot();
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0182477..628f9ed 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -34,6 +34,13 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCol
import org.apache.kafka.common.message.CreateTopicsRequestData;
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import
org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCollection;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
@@ -111,6 +118,11 @@ public class ReplicationControlManagerTest {
}
CreatableTopicResult createTestTopic(String name, int[][] replicas)
throws Exception {
+ return createTestTopic(name, replicas, (short) 0);
+ }
+
+ CreatableTopicResult createTestTopic(String name, int[][] replicas,
+ short expectedErrorCode) throws Exception {
assertFalse(replicas.length == 0);
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreatableTopic topic = new CreatableTopic().setName(name);
@@ -124,33 +136,59 @@ public class ReplicationControlManagerTest {
replicationControl.createTopics(request);
CreatableTopicResult topicResult =
result.response().topics().find(name);
assertNotNull(topicResult);
- assertEquals((short) 0, topicResult.errorCode());
- assertEquals(replicas.length, topicResult.numPartitions());
- assertEquals(replicas[0].length, topicResult.replicationFactor());
- replay(result.records());
+ assertEquals(expectedErrorCode, topicResult.errorCode());
+ if (expectedErrorCode == NONE.code()) {
+ assertEquals(replicas.length, topicResult.numPartitions());
+ assertEquals(replicas[0].length,
topicResult.replicationFactor());
+ replay(result.records());
+ }
return topicResult;
}
- }
- private static void registerBroker(int brokerId,
ReplicationControlTestContext ctx) {
- RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
- setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
- brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
- setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
- setPort((short) 9092 + brokerId).
- setName("PLAINTEXT").
- setHost("localhost"));
- ctx.clusterControl.replay(brokerRecord);
- }
+ void createPartitions(int count, String name,
+ int[][] replicas, short expectedErrorCode) throws Exception {
+ assertFalse(replicas.length == 0);
+ CreatePartitionsTopic topic = new CreatePartitionsTopic().
+ setName(name).
+ setCount(count);
+ for (int i = 0; i < replicas.length; i++) {
+ topic.assignments().add(new CreatePartitionsAssignment().
+ setBrokerIds(Replicas.toList(replicas[i])));
+ }
+ ControllerResult<List<CreatePartitionsTopicResult>> result =
+
replicationControl.createPartitions(Collections.singletonList(topic));
+ assertEquals(1, result.response().size());
+ CreatePartitionsTopicResult topicResult = result.response().get(0);
+ assertEquals(name, topicResult.name());
+ assertEquals(expectedErrorCode, topicResult.errorCode());
+ replay(result.records());
+ }
- private static void unfenceBroker(int brokerId,
- ReplicationControlTestContext ctx)
throws Exception {
- ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
- processBrokerHeartbeat(new BrokerHeartbeatRequestData().
- setBrokerId(brokerId).setBrokerEpoch(brokerId +
100).setCurrentMetadataOffset(1).
- setWantFence(false).setWantShutDown(false), 0);
- assertEquals(new BrokerHeartbeatReply(true, false, false, false),
result.response());
- ctx.replay(result.records());
+ void registerBrokers(Integer... brokerIds) throws Exception {
+ for (int brokerId : brokerIds) {
+ RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
+ setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
+ brokerRecord.endPoints().add(new
RegisterBrokerRecord.BrokerEndpoint().
+ setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+ setPort((short) 9092 + brokerId).
+ setName("PLAINTEXT").
+ setHost("localhost"));
+ replay(Collections.singletonList(new
ApiMessageAndVersion(brokerRecord, (short) 0)));
+ }
+ }
+
+ void unfenceBrokers(Integer... brokerIds) throws Exception {
+ for (int brokerId : brokerIds) {
+ ControllerResult<BrokerHeartbeatReply> result =
replicationControl.
+ processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+ setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).
+ setCurrentMetadataOffset(1).
+ setWantFence(false).setWantShutDown(false), 0);
+ assertEquals(new BrokerHeartbeatReply(true, false, false,
false),
+ result.response());
+ replay(result.records());
+ }
+ }
}
@Test
@@ -169,12 +207,8 @@ public class ReplicationControlManagerTest {
"brokers are currently fenced."));
assertEquals(expectedResponse, result.response());
- registerBroker(0, ctx);
- unfenceBroker(0, ctx);
- registerBroker(1, ctx);
- unfenceBroker(1, ctx);
- registerBroker(2, ctx);
- unfenceBroker(2, ctx);
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
ControllerResult<CreateTopicsResponseData> result2 =
replicationControl.createTopics(request);
CreateTopicsResponseData expectedResponse2 = new
CreateTopicsResponseData();
@@ -215,12 +249,8 @@ public class ReplicationControlManagerTest {
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(1).setReplicationFactor((short) -1));
- registerBroker(0, ctx);
- unfenceBroker(0, ctx);
- registerBroker(1, ctx);
- unfenceBroker(1, ctx);
- registerBroker(2, ctx);
- unfenceBroker(2, ctx);
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
List<Uuid> topicsToDelete = new ArrayList<>();
@@ -258,11 +288,8 @@ public class ReplicationControlManagerTest {
public void testOfflinePartitionAndReplicaImbalanceMetrics() throws
Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;
-
- for (int i = 0; i < 4; i++) {
- registerBroker(i, ctx);
- unfenceBroker(i, ctx);
- }
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][] {
new int[] {0, 2}, new int[] {0, 1}});
@@ -334,10 +361,8 @@ public class ReplicationControlManagerTest {
public void testRemoveLeaderships() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;
- for (int i = 0; i < 6; i++) {
- registerBroker(i, ctx);
- unfenceBroker(i, ctx);
- }
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
CreatableTopicResult result = ctx.createTestTopic("foo",
new int[][] {
new int[] {0, 1, 2},
@@ -361,10 +386,8 @@ public class ReplicationControlManagerTest {
public void testShrinkAndExpandIsr() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;
- for (int i = 0; i < 3; i++) {
- registerBroker(i, ctx);
- unfenceBroker(i, ctx);
- }
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
@@ -393,10 +416,8 @@ public class ReplicationControlManagerTest {
public void testInvalidAlterIsrRequests() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;
- for (int i = 0; i < 3; i++) {
- registerBroker(i, ctx);
- unfenceBroker(i, ctx);
- }
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});
@@ -574,10 +595,8 @@ public class ReplicationControlManagerTest {
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(3).setReplicationFactor((short) 2).
setConfigs(requestConfigs));
- registerBroker(0, ctx);
- unfenceBroker(0, ctx);
- registerBroker(1, ctx);
- unfenceBroker(1, ctx);
+ ctx.registerBrokers(0, 1);
+ ctx.unfenceBrokers(0, 1);
ControllerResult<CreateTopicsResponseData> createResult =
replicationControl.createTopics(request);
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
@@ -649,10 +668,8 @@ public class ReplicationControlManagerTest {
setNumPartitions(2).setReplicationFactor((short) 2));
request.topics().add(new CreatableTopic().setName("foo2").
setNumPartitions(2).setReplicationFactor((short) 2));
- registerBroker(0, ctx);
- unfenceBroker(0, ctx);
- registerBroker(1, ctx);
- unfenceBroker(1, ctx);
+ ctx.registerBrokers(0, 1);
+ ctx.unfenceBrokers(0, 1);
ControllerResult<CreateTopicsResponseData> createTopicResult =
replicationControl.createTopics(request);
ctx.replay(createTopicResult.records());
@@ -726,9 +743,7 @@ public class ReplicationControlManagerTest {
@Test
public void testValidateGoodManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
- registerBroker(1, ctx);
- registerBroker(2, ctx);
- registerBroker(3, ctx);
+ ctx.registerBrokers(1, 2, 3);
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
OptionalInt.of(1));
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
@@ -742,8 +757,7 @@ public class ReplicationControlManagerTest {
@Test
public void testValidateBadManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
- registerBroker(1, ctx);
- registerBroker(2, ctx);
+ ctx.registerBrokers(1, 2);
assertEquals("The manual partition assignment includes an empty
replica list.",
assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(),
@@ -776,4 +790,92 @@ public class ReplicationControlManagerTest {
assertEquals(4, ReplicationControlManager.bestLeader(
new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
}
+
+ @Test
+ public void testManualPartitionAssignmentOnAllFencedBrokers() throws
Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}},
+ INVALID_REPLICA_ASSIGNMENT.code());
+ }
+
+ @Test
+ public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced()
throws Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ctx.registerBrokers(0, 1, 2, 3, 4, 5);
+ ctx.unfenceBrokers(0, 1, 2);
+ Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1,
2}}).topicId();
+ ctx.createPartitions(2, "foo", new int[][] {new int[] {3, 4, 5}},
+ INVALID_REPLICA_ASSIGNMENT.code());
+ ctx.createPartitions(2, "foo", new int[][] {new int[] {2, 4, 5}},
NONE.code());
+ assertEquals(new PartitionRegistration(new int[] {2, 4, 5},
+ new int[] {2}, Replicas.NONE, Replicas.NONE, 2, 0, 0),
+ ctx.replicationControl.getPartition(fooId, 1));
+ }
+
+ @Test
+ public void testElectLeaders() throws Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3, 4);
+ ctx.unfenceBrokers(2, 3, 4);
+ Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+ new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2,
1}}).topicId();
+ ElectLeadersRequestData request1 = new ElectLeadersRequestData().
+ setElectionType((byte) 0).
+ setTopicPartitions(new TopicPartitionsCollection(Arrays.asList(
+ new TopicPartitions().setTopic("foo").
+ setPartitions(Arrays.asList(0, 1)),
+ new TopicPartitions().setTopic("bar").
+ setPartitions(Arrays.asList(0, 1))).iterator()));
+ ControllerResult<ElectLeadersResponseData> election1Result =
+ replication.electLeaders(request1);
+ ElectLeadersResponseData expectedResponse1 = new
ElectLeadersResponseData().
+ setErrorCode((short) 0).
+ setReplicaElectionResults(Arrays.asList(
+ new ReplicaElectionResult().setTopic("foo").
+ setPartitionResult(Arrays.asList(
+ new PartitionResult().setPartitionId(0).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
+ new PartitionResult().setPartitionId(1).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null))),
+ new ReplicaElectionResult().setTopic("bar").
+ setPartitionResult(Arrays.asList(
+ new PartitionResult().setPartitionId(0).
+ setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+ setErrorMessage("No such topic as bar"),
+ new PartitionResult().setPartitionId(1).
+ setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+ setErrorMessage("No such topic as bar")))));
+ assertEquals(expectedResponse1, election1Result.response());
+ assertEquals(Collections.emptyList(), election1Result.records());
+ ctx.unfenceBrokers(0, 1);
+
+ ControllerResult<AlterIsrResponseData> alterIsrResult =
replication.alterIsr(
+ new AlterIsrRequestData().setBrokerId(2).setBrokerEpoch(102).
+ setTopics(Arrays.asList(new
AlterIsrRequestData.TopicData().setName("foo").
+ setPartitions(Arrays.asList(new
AlterIsrRequestData.PartitionData().
+ setPartitionIndex(0).setCurrentIsrVersion(0).
+ setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2,
3)))))));
+ assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
+ new
AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
+ new AlterIsrResponseData.PartitionData().
+ setPartitionIndex(0).
+ setLeaderId(2).
+ setLeaderEpoch(0).
+ setIsr(Arrays.asList(1, 2, 3)).
+ setCurrentIsrVersion(1).
+ setErrorCode(NONE.code()))))),
+ alterIsrResult.response());
+ ctx.replay(alterIsrResult.records());
+ ControllerResult<ElectLeadersResponseData> election2Result =
+ replication.electLeaders(request1);
+ assertEquals(expectedResponse1, election2Result.response());
+ assertEquals(Arrays.asList(new ApiMessageAndVersion(new
PartitionChangeRecord().
+ setPartitionId(0).
+ setTopicId(fooId).
+ setLeader(1), (short) 0)), election2Result.records());
+ }
}