dajac commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r892157446


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2225,194 +2223,210 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, 
callback: AlterPartitionResponseData => Unit): Unit = {
-    val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
-
-    alterPartitionRequest.topics.forEach { topicReq =>
-      topicReq.partitions.forEach { partitionReq =>
-        partitionsToAlter.put(
-          new TopicPartition(topicReq.name, partitionReq.partitionIndex),
-          LeaderAndIsr(
-            alterPartitionRequest.brokerId,
-            partitionReq.leaderEpoch,
-            partitionReq.newIsr().asScala.toList.map(_.toInt),
-            LeaderRecoveryState.of(partitionReq.leaderRecoveryState),
-            partitionReq.partitionEpoch
-          )
-        )
-      }
-    }
-
-    def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
-      val resp = new AlterPartitionResponseData()
-      results match {
-        case Right(error) =>
-          resp.setErrorCode(error.code)
-        case Left(partitionResults) =>
-          resp.setTopics(new util.ArrayList())
-          partitionResults
-            .groupBy { case (tp, _) => tp.topic }   // Group by topic
-            .foreach { case (topic, partitions) =>
-              // Add each topic part to the response
-              val topicResp = new AlterPartitionResponseData.TopicData()
-                .setName(topic)
-                .setPartitions(new util.ArrayList())
-              resp.topics.add(topicResp)
-              partitions.foreach { case (tp, errorOrIsr) =>
-                // Add each partition part to the response (new ISR or error)
-                errorOrIsr match {
-                  case Left(error) => topicResp.partitions.add(
-                    new AlterPartitionResponseData.PartitionData()
-                      .setPartitionIndex(tp.partition)
-                      .setErrorCode(error.code))
-                  case Right(leaderAndIsr) =>
-                    /* Setting the LeaderRecoveryState field is always safe 
because it will always be the same
-                     * as the value set in the request. For version 0, that is 
always the default RECOVERED
-                     * which is ignored when serializing to version 0. For any 
other version, the
-                     * LeaderRecoveryState field is supported.
-                     */
-                    topicResp.partitions.add(
-                      new AlterPartitionResponseData.PartitionData()
-                        .setPartitionIndex(tp.partition)
-                        .setLeaderId(leaderAndIsr.leader)
-                        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-                        .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-                        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
-                        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-                    )
-                }
-            }
-          }
-      }
-      callback.apply(resp)
-    }
-
-    eventManager.put(
-      AlterPartitionReceived(alterPartitionRequest.brokerId, 
alterPartitionRequest.brokerEpoch, partitionsToAlter, responseCallback)
-    )
+  def alterPartitions(

Review Comment:
   That's right. Do you think we need it?
   
   I suppose that we could have a similar race condition, especially if the 
shutting down replica is not in the ISR at the time of shutting down. In this 
case, we don't bump the leader epoch so it could make it back into the ISR 
before receiving the stop replica request. We could prevent shutting down 
replicas to join the ISR. One issue is that the leaders will never learn about 
this state so they don't have a way to prevent unnecessary retries. This is a 
similar discussion that we had for KRaft.
   
   Given that we explicitly stop replicas, I tend to believe that this race 
condition is less likely in ZK mode. I wonder if it is worth fixing it. What do 
you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to