dajac commented on code in PR #13408: URL: https://github.com/apache/kafka/pull/13408#discussion_r1150489001
########## core/src/main/scala/kafka/controller/KafkaController.scala: ########## @@ -2324,12 +2324,18 @@ class KafkaController(val config: KafkaConfig, case Some(topicName) => topicReq.partitions.forEach { partitionReq => + var isr = List[Int]() Review Comment: nit: Using a `var` is not required here. How about using `val isr = if (....) {`? ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1108,19 +1116,23 @@ private Errors validateAlterPartitionData( return INVALID_UPDATE_VERSION; } - int[] newIsr = Replicas.toArray(partitionData.newIsr()); + + int[] newIsr = partitionData.newIsrWithEpochs().stream() + .map(brokerState -> brokerState.brokerId()).collect(Collectors.toList()) + .stream().mapToInt(Integer::intValue).toArray(); Review Comment: Do we really need the intermediate `.collect(Collectors.toList())`? If it is, it may be better to add another method to `Replicas`: `Replicas.toArray(partitionData.newIsrWithEpochs())`. There we could initialize an array with the right size and set the values. ########## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ########## @@ -1608,6 +1622,75 @@ public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Ex alterPartitionResult.response()); } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) + public void testAlterPartitionShouldRejectRebootBrokers(short version) throws Exception { Review Comment: nit: `...ShouldRejectBrokersWithStaleEpoch`? ########## clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java: ########## @@ -77,6 +82,18 @@ public Builder(AlterPartitionRequestData data, boolean canUseTopicIds) { @Override public AlterPartitionRequest build(short version) { + if (version < 3) { + data.topics().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + List<Integer> newIsr = new LinkedList<>(); Review Comment: nit: Should we use an ArrayList initialized with the correct size? ########## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ########## @@ -1505,17 +1518,18 @@ public void testReassignPartitions(short version) throws Exception { log.info("running final alterPartition..."); ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION, version); - ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition( - requestContext, - new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103). + AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().setBrokerId(3).setBrokerEpoch(103). Review Comment: nit: Could we put `setBrokerId` and `setBrokerEpoch` on new lines in oder to keep the format of the code consistent? ########## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ########## @@ -1608,6 +1622,75 @@ public void testAlterPartitionShouldRejectFencedBrokers(short version) throws Ex alterPartitionResult.response()); } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) + public void testAlterPartitionShouldRejectRebootBrokers(short version) throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3, 4); + ctx.unfenceBrokers(0, 1, 2, 3, 4); + Uuid fooId = ctx.createTestTopic( + "foo", + new int[][] {new int[] {1, 2, 3, 4}} + ).topicId(); + ctx.alterPartition(new TopicIdPartition(fooId, 0), 1, generateIsrWithTestDefaultEpoch(asList(1, 2, 3)), LeaderRecoveryState.RECOVERED); + + // First, the leader is constructing an AlterPartition request. + AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData() + .setBrokerId(1) + .setBrokerEpoch(101) + .setTopics(asList(new TopicData() + .setTopicName(version <= 1 ? "foo" : "") + .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID) + .setPartitions(asList(new PartitionData() + .setPartitionIndex(0) + .setPartitionEpoch(1) + .setLeaderEpoch(1) + .setNewIsrWithEpochs(generateIsrWithTestDefaultEpoch(asList(1, 2, 3, 4))))))); + + // The broker 4 has failed silently and now registers again. + long newEpoch = generateTestDefaultBrokerEpoch(4) + 1000; + RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord(). + setBrokerEpoch(newEpoch).setBrokerId(4).setRack(null); + brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint(). + setSecurityProtocol(SecurityProtocol.PLAINTEXT.id). + setPort((short) 9092 + 4). + setName("PLAINTEXT"). + setHost("localhost")); + ctx.replay(Collections.singletonList(new ApiMessageAndVersion(brokerRecord, (short) 0))); + + // Unfence the broker 4. + ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl. + processBrokerHeartbeat(new BrokerHeartbeatRequestData(). + setBrokerId(4).setBrokerEpoch(newEpoch). + setCurrentMetadataOffset(1). + setWantFence(false).setWantShutDown(false), 0); + assertEquals(new BrokerHeartbeatReply(true, false, false, false), + result.response()); + ctx.replay(result.records()); + + ControllerRequestContext requestContext = + anonymousContextFor(ApiKeys.ALTER_PARTITION, version); + + ControllerResult<AlterPartitionResponseData> alterPartitionResult = + replication.alterPartition(requestContext, new AlterPartitionRequest.Builder(alterIsrRequest, version > 1).build(version).data()); + + // The late arrived AlterPartition request should be rejected when version >= 3. + if (version >= 3) { + assertEquals( + new AlterPartitionResponseData() + .setTopics(asList(new AlterPartitionResponseData.TopicData() + .setTopicName(version <= 1 ? "foo" : "") + .setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID) + .setPartitions(asList(new AlterPartitionResponseData.PartitionData() + .setPartitionIndex(0) + .setErrorCode(INELIGIBLE_REPLICA.code()))))), + alterPartitionResult.response()); + } else { + assertEquals((short) 0, alterPartitionResult.response().errorCode()); Review Comment: nit: Could we use `NONE.code()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org