This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 301a07f  KAFKA-13083: Fix KRaft ISR in createPartitions, createTopics
301a07f is described below

commit 301a07f4d877ebd22e111831d5445ade3f73af14
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Jul 14 10:52:25 2021 -0700

    KAFKA-13083: Fix KRaft ISR in createPartitions, createTopics
    
    When creating a new topic or partition via a manual partition assignment, we
    must check if the nodes are unfenced before adding them to the new ISR.
    We also need to reject attempts to create a new partition with all fenced
    nodes.
    
    Reviewers: David Arthur <[email protected]>
---
 .../controller/ReplicationControlManager.java      |  34 ++-
 .../kafka/controller/QuorumControllerTest.java     |  29 +--
 .../controller/ReplicationControlManagerTest.java  | 232 +++++++++++++++------
 3 files changed, 205 insertions(+), 90 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index db8d86a..1aa133e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -84,6 +84,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.OptionalInt;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@@ -376,9 +377,16 @@ public class ReplicationControlManager {
                 }
                 validateManualPartitionAssignment(assignment.brokerIds(), 
replicationFactor);
                 replicationFactor = 
OptionalInt.of(assignment.brokerIds().size());
-                int[] replicas = Replicas.toArray(assignment.brokerIds());
+                List<Integer> isr = assignment.brokerIds().stream().
+                    
filter(clusterControl::unfenced).collect(Collectors.toList());
+                if (isr.isEmpty()) {
+                    return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
+                        "All brokers specified in the manual partition 
assignment for " +
+                        "partition " + assignment.partitionIndex() + " are 
fenced.");
+                }
                 newParts.put(assignment.partitionIndex(), new 
PartitionRegistration(
-                    replicas, replicas, Replicas.NONE, Replicas.NONE, 
replicas[0], 0, 0));
+                    Replicas.toArray(assignment.brokerIds()), 
Replicas.toArray(isr),
+                    Replicas.NONE, Replicas.NONE, isr.get(0), 0, 0));
             }
         } else if (topic.replicationFactor() < -1 || topic.replicationFactor() 
== 0) {
             return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@@ -910,27 +918,41 @@ public class ReplicationControlManager {
         int startPartitionId = topicInfo.parts.size();
 
         List<List<Integer>> placements;
+        List<List<Integer>> isrs;
         if (topic.assignments() != null) {
             placements = new ArrayList<>();
-            for (CreatePartitionsAssignment assignment : topic.assignments()) {
+            isrs = new ArrayList<>();
+            for (int i = 0; i < topic.assignments().size(); i++) {
+                CreatePartitionsAssignment assignment = 
topic.assignments().get(i);
                 validateManualPartitionAssignment(assignment.brokerIds(),
                     OptionalInt.of(replicationFactor));
                 placements.add(assignment.brokerIds());
+                List<Integer> isr = assignment.brokerIds().stream().
+                    
filter(clusterControl::unfenced).collect(Collectors.toList());
+                if (isr.isEmpty()) {
+                    throw new InvalidReplicaAssignmentException(
+                        "All brokers specified in the manual partition 
assignment for " +
+                            "partition " + (startPartitionId + i) + " are 
fenced.");
+                }
+                isrs.add(isr);
             }
         } else {
             placements = clusterControl.placeReplicas(startPartitionId, 
additional,
                 replicationFactor);
+            isrs = placements;
         }
         int partitionId = startPartitionId;
-        for (List<Integer> placement : placements) {
+        for (int i = 0; i < placements.size(); i++) {
+            List<Integer> placement = placements.get(i);
+            List<Integer> isr = isrs.get(i);
             records.add(new ApiMessageAndVersion(new PartitionRecord().
                 setPartitionId(partitionId).
                 setTopicId(topicId).
                 setReplicas(placement).
-                setIsr(placement).
+                setIsr(isr).
                 setRemovingReplicas(Collections.emptyList()).
                 setAddingReplicas(Collections.emptyList()).
-                setLeader(placement.get(0)).
+                setLeader(isr.get(0)).
                 setLeaderEpoch(0).
                 setPartitionEpoch(0), 
PARTITION_RECORD.highestSupportedVersion()));
             partitionId++;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 38a368d..911f955 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -347,18 +347,12 @@ public class QuorumControllerTest {
     public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable {
         final int numBrokers = 4;
         final int maxNewRecordBytes = 1000;
-        int appendedBytes = 0;
         Map<Integer, Long> brokerEpochs = new HashMap<>();
-        Uuid fooId;
         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, 
Optional.empty())) {
             try (QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv(logEnv,
-                    builder -> {
-                        builder
-                            .setConfigDefs(CONFIGS)
-                            .setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
-                    })
+                    builder -> builder.setConfigDefs(CONFIGS).
+                        setSnapshotMaxNewRecordBytes(maxNewRecordBytes))
             ) {
-
                 QuorumController active = controlEnv.activeController();
                 for (int i = 0; i < numBrokers; i++) {
                     BrokerRegistrationReply reply = active.registerBroker(
@@ -371,25 +365,23 @@ public class QuorumControllerTest {
                                 setName("PLAINTEXT").setHost("localhost").
                                 setPort(9092 + i)).iterator()))).get();
                     brokerEpochs.put(i, reply.epoch());
+                    assertEquals(new BrokerHeartbeatReply(true, false, false, 
false),
+                        active.processBrokerHeartbeat(new 
BrokerHeartbeatRequestData().
+                            
setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
+                            
setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
                 }
 
-                assertTrue(
-                    logEnv.appendedBytes() < maxNewRecordBytes,
-                    String.format(
-                        "%s appended bytes is not less than %s max new record 
bytes",
+                assertTrue(logEnv.appendedBytes() < maxNewRecordBytes,
+                    String.format("%s appended bytes is not less than %s max 
new record bytes",
                         logEnv.appendedBytes(),
-                        maxNewRecordBytes
-                    )
-                );
+                        maxNewRecordBytes));
 
                 // Keep creating topic until we reached the max bytes limit
                 int counter = 0;
                 while (logEnv.appendedBytes() < maxNewRecordBytes) {
                     counter += 1;
                     String topicName = String.format("foo-%s", counter);
-
-                    CreateTopicsResponseData reply = active.createTopics(
-                        new CreateTopicsRequestData().setTopics(
+                    active.createTopics(new 
CreateTopicsRequestData().setTopics(
                             new CreatableTopicCollection(Collections.singleton(
                                 new 
CreatableTopic().setName(topicName).setNumPartitions(-1).
                                     setReplicationFactor((short) -1).
@@ -402,7 +394,6 @@ public class QuorumControllerTest {
                                             setBrokerIds(Arrays.asList(1, 2, 
0))).
                                                 
iterator()))).iterator()))).get();
                 }
-
                 logEnv.waitForLatestSnapshot();
             }
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0182477..628f9ed 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -34,6 +34,13 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCol
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import 
org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCollection;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
+import 
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
@@ -111,6 +118,11 @@ public class ReplicationControlManagerTest {
         }
 
         CreatableTopicResult createTestTopic(String name, int[][] replicas) 
throws Exception {
+            return createTestTopic(name, replicas, (short) 0);
+        }
+
+        CreatableTopicResult createTestTopic(String name, int[][] replicas,
+                short expectedErrorCode) throws Exception {
             assertFalse(replicas.length == 0);
             CreateTopicsRequestData request = new CreateTopicsRequestData();
             CreatableTopic topic = new CreatableTopic().setName(name);
@@ -124,33 +136,59 @@ public class ReplicationControlManagerTest {
                 replicationControl.createTopics(request);
             CreatableTopicResult topicResult = 
result.response().topics().find(name);
             assertNotNull(topicResult);
-            assertEquals((short) 0, topicResult.errorCode());
-            assertEquals(replicas.length, topicResult.numPartitions());
-            assertEquals(replicas[0].length, topicResult.replicationFactor());
-            replay(result.records());
+            assertEquals(expectedErrorCode, topicResult.errorCode());
+            if (expectedErrorCode == NONE.code()) {
+                assertEquals(replicas.length, topicResult.numPartitions());
+                assertEquals(replicas[0].length, 
topicResult.replicationFactor());
+                replay(result.records());
+            }
             return topicResult;
         }
-    }
 
-    private static void registerBroker(int brokerId, 
ReplicationControlTestContext ctx) {
-        RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
-            setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
-        brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
-            setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
-            setPort((short) 9092 + brokerId).
-            setName("PLAINTEXT").
-            setHost("localhost"));
-        ctx.clusterControl.replay(brokerRecord);
-    }
+        void createPartitions(int count, String name,
+                int[][] replicas, short expectedErrorCode) throws Exception {
+            assertFalse(replicas.length == 0);
+            CreatePartitionsTopic topic = new CreatePartitionsTopic().
+                setName(name).
+                setCount(count);
+            for (int i = 0; i < replicas.length; i++) {
+                topic.assignments().add(new CreatePartitionsAssignment().
+                    setBrokerIds(Replicas.toList(replicas[i])));
+            }
+            ControllerResult<List<CreatePartitionsTopicResult>> result =
+                
replicationControl.createPartitions(Collections.singletonList(topic));
+            assertEquals(1, result.response().size());
+            CreatePartitionsTopicResult topicResult = result.response().get(0);
+            assertEquals(name, topicResult.name());
+            assertEquals(expectedErrorCode, topicResult.errorCode());
+            replay(result.records());
+        }
 
-    private static void unfenceBroker(int brokerId,
-                                      ReplicationControlTestContext ctx) 
throws Exception {
-        ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
-            processBrokerHeartbeat(new BrokerHeartbeatRequestData().
-                setBrokerId(brokerId).setBrokerEpoch(brokerId + 
100).setCurrentMetadataOffset(1).
-                setWantFence(false).setWantShutDown(false), 0);
-        assertEquals(new BrokerHeartbeatReply(true, false, false, false), 
result.response());
-        ctx.replay(result.records());
+        void registerBrokers(Integer... brokerIds) throws Exception {
+            for (int brokerId : brokerIds) {
+                RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
+                    setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
+                brokerRecord.endPoints().add(new 
RegisterBrokerRecord.BrokerEndpoint().
+                    setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+                    setPort((short) 9092 + brokerId).
+                    setName("PLAINTEXT").
+                    setHost("localhost"));
+                replay(Collections.singletonList(new 
ApiMessageAndVersion(brokerRecord, (short) 0)));
+            }
+        }
+
+        void unfenceBrokers(Integer... brokerIds) throws Exception {
+            for (int brokerId : brokerIds) {
+                ControllerResult<BrokerHeartbeatReply> result = 
replicationControl.
+                    processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+                        setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).
+                        setCurrentMetadataOffset(1).
+                        setWantFence(false).setWantShutDown(false), 0);
+                assertEquals(new BrokerHeartbeatReply(true, false, false, 
false),
+                    result.response());
+                replay(result.records());
+            }
+        }
     }
 
     @Test
@@ -169,12 +207,8 @@ public class ReplicationControlManagerTest {
                     "brokers are currently fenced."));
         assertEquals(expectedResponse, result.response());
 
-        registerBroker(0, ctx);
-        unfenceBroker(0, ctx);
-        registerBroker(1, ctx);
-        unfenceBroker(1, ctx);
-        registerBroker(2, ctx);
-        unfenceBroker(2, ctx);
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
         ControllerResult<CreateTopicsResponseData> result2 =
             replicationControl.createTopics(request);
         CreateTopicsResponseData expectedResponse2 = new 
CreateTopicsResponseData();
@@ -215,12 +249,8 @@ public class ReplicationControlManagerTest {
         request.topics().add(new CreatableTopic().setName("foo").
             setNumPartitions(1).setReplicationFactor((short) -1));
 
-        registerBroker(0, ctx);
-        unfenceBroker(0, ctx);
-        registerBroker(1, ctx);
-        unfenceBroker(1, ctx);
-        registerBroker(2, ctx);
-        unfenceBroker(2, ctx);
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
 
         List<Uuid> topicsToDelete = new ArrayList<>();
 
@@ -258,11 +288,8 @@ public class ReplicationControlManagerTest {
     public void testOfflinePartitionAndReplicaImbalanceMetrics() throws 
Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
         ReplicationControlManager replicationControl = ctx.replicationControl;
-
-        for (int i = 0; i < 4; i++) {
-            registerBroker(i, ctx);
-            unfenceBroker(i, ctx);
-        }
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
 
         CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][] {
             new int[] {0, 2}, new int[] {0, 1}});
@@ -334,10 +361,8 @@ public class ReplicationControlManagerTest {
     public void testRemoveLeaderships() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
         ReplicationControlManager replicationControl = ctx.replicationControl;
-        for (int i = 0; i < 6; i++) {
-            registerBroker(i, ctx);
-            unfenceBroker(i, ctx);
-        }
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
         CreatableTopicResult result = ctx.createTestTopic("foo",
             new int[][] {
                 new int[] {0, 1, 2},
@@ -361,10 +386,8 @@ public class ReplicationControlManagerTest {
     public void testShrinkAndExpandIsr() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
         ReplicationControlManager replicationControl = ctx.replicationControl;
-        for (int i = 0; i < 3; i++) {
-            registerBroker(i, ctx);
-            unfenceBroker(i, ctx);
-        }
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
         CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
             new int[][] {new int[] {0, 1, 2}});
 
@@ -393,10 +416,8 @@ public class ReplicationControlManagerTest {
     public void testInvalidAlterIsrRequests() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
         ReplicationControlManager replicationControl = ctx.replicationControl;
-        for (int i = 0; i < 3; i++) {
-            registerBroker(i, ctx);
-            unfenceBroker(i, ctx);
-        }
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
         CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
             new int[][] {new int[] {0, 1, 2}});
 
@@ -574,10 +595,8 @@ public class ReplicationControlManagerTest {
         request.topics().add(new CreatableTopic().setName("foo").
             setNumPartitions(3).setReplicationFactor((short) 2).
             setConfigs(requestConfigs));
-        registerBroker(0, ctx);
-        unfenceBroker(0, ctx);
-        registerBroker(1, ctx);
-        unfenceBroker(1, ctx);
+        ctx.registerBrokers(0, 1);
+        ctx.unfenceBrokers(0, 1);
         ControllerResult<CreateTopicsResponseData> createResult =
             replicationControl.createTopics(request);
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
@@ -649,10 +668,8 @@ public class ReplicationControlManagerTest {
             setNumPartitions(2).setReplicationFactor((short) 2));
         request.topics().add(new CreatableTopic().setName("foo2").
             setNumPartitions(2).setReplicationFactor((short) 2));
-        registerBroker(0, ctx);
-        unfenceBroker(0, ctx);
-        registerBroker(1, ctx);
-        unfenceBroker(1, ctx);
+        ctx.registerBrokers(0, 1);
+        ctx.unfenceBrokers(0, 1);
         ControllerResult<CreateTopicsResponseData> createTopicResult =
             replicationControl.createTopics(request);
         ctx.replay(createTopicResult.records());
@@ -726,9 +743,7 @@ public class ReplicationControlManagerTest {
     @Test
     public void testValidateGoodManualPartitionAssignments() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
-        registerBroker(1, ctx);
-        registerBroker(2, ctx);
-        registerBroker(3, ctx);
+        ctx.registerBrokers(1, 2, 3);
         
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
             OptionalInt.of(1));
         
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
@@ -742,8 +757,7 @@ public class ReplicationControlManagerTest {
     @Test
     public void testValidateBadManualPartitionAssignments() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
-        registerBroker(1, ctx);
-        registerBroker(2, ctx);
+        ctx.registerBrokers(1, 2);
         assertEquals("The manual partition assignment includes an empty 
replica list.",
             assertThrows(InvalidReplicaAssignmentException.class, () ->
                 
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(),
@@ -776,4 +790,92 @@ public class ReplicationControlManagerTest {
         assertEquals(4, ReplicationControlManager.bestLeader(
             new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
     }
+
+    @Test
+    public void testManualPartitionAssignmentOnAllFencedBrokers() throws 
Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}},
+            INVALID_REPLICA_ASSIGNMENT.code());
+    }
+
+    @Test
+    public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() 
throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ctx.registerBrokers(0, 1, 2, 3, 4, 5);
+        ctx.unfenceBrokers(0, 1, 2);
+        Uuid fooId = ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 
2}}).topicId();
+        ctx.createPartitions(2, "foo", new int[][] {new int[] {3, 4, 5}},
+            INVALID_REPLICA_ASSIGNMENT.code());
+        ctx.createPartitions(2, "foo", new int[][] {new int[] {2, 4, 5}}, 
NONE.code());
+        assertEquals(new PartitionRegistration(new int[] {2, 4, 5},
+                new int[] {2}, Replicas.NONE, Replicas.NONE, 2, 0, 0),
+            ctx.replicationControl.getPartition(fooId, 1));
+    }
+
+    @Test
+    public void testElectLeaders() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(2, 3, 4);
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+        ElectLeadersRequestData request1 = new ElectLeadersRequestData().
+            setElectionType((byte) 0).
+            setTopicPartitions(new TopicPartitionsCollection(Arrays.asList(
+                new TopicPartitions().setTopic("foo").
+                    setPartitions(Arrays.asList(0, 1)),
+                new TopicPartitions().setTopic("bar").
+                    setPartitions(Arrays.asList(0, 1))).iterator()));
+        ControllerResult<ElectLeadersResponseData> election1Result =
+            replication.electLeaders(request1);
+        ElectLeadersResponseData expectedResponse1 = new 
ElectLeadersResponseData().
+            setErrorCode((short) 0).
+            setReplicaElectionResults(Arrays.asList(
+                new ReplicaElectionResult().setTopic("foo").
+                    setPartitionResult(Arrays.asList(
+                        new PartitionResult().setPartitionId(0).
+                            setErrorCode(NONE.code()).
+                            setErrorMessage(null),
+                        new PartitionResult().setPartitionId(1).
+                            setErrorCode(NONE.code()).
+                            setErrorMessage(null))),
+                new ReplicaElectionResult().setTopic("bar").
+                    setPartitionResult(Arrays.asList(
+                        new PartitionResult().setPartitionId(0).
+                            setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+                            setErrorMessage("No such topic as bar"),
+                        new PartitionResult().setPartitionId(1).
+                            setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+                            setErrorMessage("No such topic as bar")))));
+        assertEquals(expectedResponse1, election1Result.response());
+        assertEquals(Collections.emptyList(), election1Result.records());
+        ctx.unfenceBrokers(0, 1);
+
+        ControllerResult<AlterIsrResponseData> alterIsrResult = 
replication.alterIsr(
+            new AlterIsrRequestData().setBrokerId(2).setBrokerEpoch(102).
+                setTopics(Arrays.asList(new 
AlterIsrRequestData.TopicData().setName("foo").
+                    setPartitions(Arrays.asList(new 
AlterIsrRequestData.PartitionData().
+                        setPartitionIndex(0).setCurrentIsrVersion(0).
+                        setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 
3)))))));
+        assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
+            new 
AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
+                new AlterIsrResponseData.PartitionData().
+                    setPartitionIndex(0).
+                    setLeaderId(2).
+                    setLeaderEpoch(0).
+                    setIsr(Arrays.asList(1, 2, 3)).
+                    setCurrentIsrVersion(1).
+                    setErrorCode(NONE.code()))))),
+            alterIsrResult.response());
+        ctx.replay(alterIsrResult.records());
+        ControllerResult<ElectLeadersResponseData> election2Result =
+            replication.electLeaders(request1);
+        assertEquals(expectedResponse1, election2Result.response());
+        assertEquals(Arrays.asList(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+            setPartitionId(0).
+            setTopicId(fooId).
+            setLeader(1), (short) 0)), election2Result.records());
+    }
 }

Reply via email to