dajac commented on code in PR #13408:
URL: https://github.com/apache/kafka/pull/13408#discussion_r1150489001


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2324,12 +2324,18 @@ class KafkaController(val config: KafkaConfig,
 
         case Some(topicName) =>
           topicReq.partitions.forEach { partitionReq =>
+            var isr = List[Int]()

Review Comment:
   nit: Using a `var` is not required here. How about using `val isr = if 
(....) {`?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1108,19 +1116,23 @@ private Errors validateAlterPartitionData(
 
             return INVALID_UPDATE_VERSION;
         }
-        int[] newIsr = Replicas.toArray(partitionData.newIsr());
+
+        int[] newIsr = partitionData.newIsrWithEpochs().stream()
+            .map(brokerState -> 
brokerState.brokerId()).collect(Collectors.toList())
+            .stream().mapToInt(Integer::intValue).toArray();

Review Comment:
   Do we really need the intermediate `.collect(Collectors.toList())`? If it 
is, it may be better to add another method to `Replicas`: 
`Replicas.toArray(partitionData.newIsrWithEpochs())`. There we could initialize 
an array with the right size and set the values.



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1608,6 +1622,75 @@ public void 
testAlterPartitionShouldRejectFencedBrokers(short version) throws Ex
             alterPartitionResult.response());
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionShouldRejectRebootBrokers(short version) 
throws Exception {

Review Comment:
   nit: `...ShouldRejectBrokersWithStaleEpoch`?



##########
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java:
##########
@@ -77,6 +82,18 @@ public Builder(AlterPartitionRequestData data, boolean 
canUseTopicIds) {
 
         @Override
         public AlterPartitionRequest build(short version) {
+            if (version < 3) {
+                data.topics().forEach(topicData -> {
+                    topicData.partitions().forEach(partitionData -> {
+                        List<Integer> newIsr = new LinkedList<>();

Review Comment:
   nit: Should we use an ArrayList initialized with the correct size?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1505,17 +1518,18 @@ public void testReassignPartitions(short version) 
throws Exception {
         log.info("running final alterPartition...");
         ControllerRequestContext requestContext =
             anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
-        ControllerResult<AlterPartitionResponseData> alterPartitionResult = 
replication.alterPartition(
-            requestContext,
-            new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103).
+        AlterPartitionRequestData alterPartitionRequestData = new 
AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103).

Review Comment:
   nit: Could we put `setBrokerId` and `setBrokerEpoch` on new lines in oder to 
keep the format of the code consistent?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1608,6 +1622,75 @@ public void 
testAlterPartitionShouldRejectFencedBrokers(short version) throws Ex
             alterPartitionResult.response());
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionShouldRejectRebootBrokers(short version) 
throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+        Uuid fooId = ctx.createTestTopic(
+            "foo",
+            new int[][] {new int[] {1, 2, 3, 4}}
+        ).topicId();
+        ctx.alterPartition(new TopicIdPartition(fooId, 0), 1, 
generateIsrWithTestDefaultEpoch(asList(1, 2, 3)), 
LeaderRecoveryState.RECOVERED);
+
+        // First, the leader is constructing an AlterPartition request.
+        AlterPartitionRequestData alterIsrRequest = new 
AlterPartitionRequestData()
+            .setBrokerId(1)
+            .setBrokerEpoch(101)
+            .setTopics(asList(new TopicData()
+                .setTopicName(version <= 1 ? "foo" : "")
+                .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                .setPartitions(asList(new PartitionData()
+                    .setPartitionIndex(0)
+                    .setPartitionEpoch(1)
+                    .setLeaderEpoch(1)
+                    
.setNewIsrWithEpochs(generateIsrWithTestDefaultEpoch(asList(1, 2, 3, 4)))))));
+
+        // The broker 4 has failed silently and now registers again.
+        long newEpoch = generateTestDefaultBrokerEpoch(4) + 1000;
+        RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
+            setBrokerEpoch(newEpoch).setBrokerId(4).setRack(null);
+        brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
+            setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+            setPort((short) 9092 + 4).
+            setName("PLAINTEXT").
+            setHost("localhost"));
+        ctx.replay(Collections.singletonList(new 
ApiMessageAndVersion(brokerRecord, (short) 0)));
+
+        // Unfence the broker 4.
+        ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
+            processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+                setBrokerId(4).setBrokerEpoch(newEpoch).
+                setCurrentMetadataOffset(1).
+                setWantFence(false).setWantShutDown(false), 0);
+        assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+            result.response());
+        ctx.replay(result.records());
+
+        ControllerRequestContext requestContext =
+            anonymousContextFor(ApiKeys.ALTER_PARTITION, version);
+
+        ControllerResult<AlterPartitionResponseData> alterPartitionResult =
+            replication.alterPartition(requestContext, new 
AlterPartitionRequest.Builder(alterIsrRequest, version > 
1).build(version).data());
+
+        // The late arrived AlterPartition request should be rejected when 
version >= 3.
+        if (version >= 3) {
+            assertEquals(
+                new AlterPartitionResponseData()
+                    .setTopics(asList(new 
AlterPartitionResponseData.TopicData()
+                        .setTopicName(version <= 1 ? "foo" : "")
+                        .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
+                        .setPartitions(asList(new 
AlterPartitionResponseData.PartitionData()
+                            .setPartitionIndex(0)
+                            .setErrorCode(INELIGIBLE_REPLICA.code()))))),
+                alterPartitionResult.response());
+        } else {
+            assertEquals((short) 0, 
alterPartitionResult.response().errorCode());

Review Comment:
   nit: Could we use `NONE.code()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to