dajac commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r894269579
##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2319,125 +2324,117 @@ class KafkaController(val config: KafkaConfig,
}
val partitionResponses = mutable.HashMap[TopicPartition, Either[Errors,
LeaderAndIsr]]()
- try {
- // Determine which partitions we will accept the new ISR for
- val adjustedIsrs = partitionsToAlter.flatMap { case (tp,
newLeaderAndIsr) =>
- controllerContext.partitionLeadershipInfo(tp) match {
- case Some(leaderIsrAndControllerEpoch) =>
- val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
- if (newLeaderAndIsr.leaderEpoch !=
currentLeaderAndIsr.leaderEpoch) {
- partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
- None
- } else if (newLeaderAndIsr.partitionEpoch <
currentLeaderAndIsr.partitionEpoch) {
- partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
- None
- } else if
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
- // If a partition is already in the desired state, just return it
- partitionResponses(tp) = Right(currentLeaderAndIsr)
- None
- } else if (newLeaderAndIsr.leaderRecoveryState ==
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
- partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
- info(
- s"Rejecting AlterPartition from node $brokerId for $tp because
leader is recovering and ISR is greater than 1: " +
- s"$newLeaderAndIsr"
- )
- None
- } else if (currentLeaderAndIsr.leaderRecoveryState ==
LeaderRecoveryState.RECOVERED &&
- newLeaderAndIsr.leaderRecoveryState ==
LeaderRecoveryState.RECOVERING) {
-
- partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
- info(
- s"Rejecting AlterPartition from node $brokerId for $tp because
the leader recovery state cannot change from " +
- s"RECOVERED to RECOVERING: $newLeaderAndIsr"
- )
- None
- } else {
- Some(tp -> newLeaderAndIsr)
- }
-
- case None =>
- partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ // Determine which partitions we will accept the new ISR for
+ val adjustedIsrs = partitionsToAlter.flatMap { case (tp, newLeaderAndIsr)
=>
+ controllerContext.partitionLeadershipInfo(tp) match {
+ case Some(leaderIsrAndControllerEpoch) =>
+ val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+ if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
+ partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
None
- }
- }
+ } else if (newLeaderAndIsr.partitionEpoch <
currentLeaderAndIsr.partitionEpoch) {
+ partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
+ None
+ } else if
(newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
+ // If a partition is already in the desired state, just return it
+ partitionResponses(tp) = Right(currentLeaderAndIsr)
+ None
+ } else if (newLeaderAndIsr.leaderRecoveryState ==
LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
+ partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+ info(
+ s"Rejecting AlterPartition from node $brokerId for $tp because
leader is recovering and ISR is greater than 1: " +
+ s"$newLeaderAndIsr"
+ )
+ None
+ } else if (currentLeaderAndIsr.leaderRecoveryState ==
LeaderRecoveryState.RECOVERED &&
+ newLeaderAndIsr.leaderRecoveryState ==
LeaderRecoveryState.RECOVERING) {
- // Do the updates in ZK
- debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
- val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =
zkClient.updateLeaderAndIsr(
- adjustedIsrs, controllerContext.epoch,
controllerContext.epochZkVersion)
-
- val successfulUpdates = finishedUpdates.flatMap { case (partition,
isrOrError) =>
- isrOrError match {
- case Right(updatedIsr) =>
- debug(s"ISR for partition $partition updated to
[${updatedIsr.isr.mkString(",")}] and zkVersion updated to
[${updatedIsr.partitionEpoch}]")
- partitionResponses(partition) = Right(updatedIsr)
- Some(partition -> updatedIsr)
- case Left(e) =>
- error(s"Failed to update ISR for partition $partition", e)
- partitionResponses(partition) = Left(Errors.forException(e))
+ partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
+ info(
+ s"Rejecting AlterPartition from node $brokerId for $tp because
the leader recovery state cannot change from " +
+ s"RECOVERED to RECOVERING: $newLeaderAndIsr"
+ )
None
- }
+ } else {
+ Some(tp -> newLeaderAndIsr)
+ }
+
+ case None =>
+ partitionResponses(tp) = Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ None
}
+ }
- badVersionUpdates.foreach { partition =>
- info(s"Failed to update ISR to ${adjustedIsrs(partition)} for
partition $partition, bad ZK version.")
- partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+ // Do the updates in ZK
+ debug(s"Updating ISRs for partitions: ${adjustedIsrs.keySet}.")
+ val UpdateLeaderAndIsrResult(finishedUpdates, badVersionUpdates) =
zkClient.updateLeaderAndIsr(
+ adjustedIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
+
+ val successfulUpdates = finishedUpdates.flatMap { case (partition,
isrOrError) =>
+ isrOrError match {
+ case Right(updatedIsr) =>
+ debug(s"ISR for partition $partition updated to $updatedIsr.")
+ partitionResponses(partition) = Right(updatedIsr)
+ Some(partition -> updatedIsr)
+ case Left(e) =>
+ error(s"Failed to update ISR for partition $partition", e)
+ partitionResponses(partition) = Left(Errors.forException(e))
+ None
}
+ }
- // Update our cache and send out metadata updates
- updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
- sendUpdateMetadataRequest(
- controllerContext.liveOrShuttingDownBrokerIds.toSeq,
- partitionsToAlter.keySet
- )
+ badVersionUpdates.foreach { partition =>
+ info(s"Failed to update ISR to ${adjustedIsrs(partition)} for partition
$partition, bad ZK version.")
+ partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
+ }
- partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName,
partitionResponses) =>
- // Add each topic part to the response
- val topicResponse = if (useTopicsIds) {
- new AlterPartitionResponseData.TopicData()
- .setTopicId(controllerContext.topicIds.getOrElse(topicName,
Uuid.ZERO_UUID))
- } else {
- new AlterPartitionResponseData.TopicData()
- .setTopicName(topicName)
- }
- alterPartitionResponse.topics.add(topicResponse)
-
- partitionResponses.forKeyValue { (tp, errorOrIsr) =>
- // Add each partition part to the response (new ISR or error)
- errorOrIsr match {
- case Left(error) =>
- topicResponse.partitions.add(
- new AlterPartitionResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setErrorCode(error.code))
- case Right(leaderAndIsr) =>
- /* Setting the LeaderRecoveryState field is always safe because
it will always be the same
- * as the value set in the request. For version 0, that is
always the default RECOVERED
- * which is ignored when serializing to version 0. For any other
version, the
- * LeaderRecoveryState field is supported.
- */
- topicResponse.partitions.add(
- new AlterPartitionResponseData.PartitionData()
- .setPartitionIndex(tp.partition)
- .setLeaderId(leaderAndIsr.leader)
- .setLeaderEpoch(leaderAndIsr.leaderEpoch)
- .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
-
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
- .setPartitionEpoch(leaderAndIsr.partitionEpoch)
- )
- }
+ // Update our cache and send out metadata updates
+ updateLeaderAndIsrCache(successfulUpdates.keys.toSeq)
+ sendUpdateMetadataRequest(
+ controllerContext.liveOrShuttingDownBrokerIds.toSeq,
+ partitionsToAlter.keySet
+ )
+
+ partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName,
partitionResponses) =>
+ // Add each topic part to the response
+ val topicResponse = if (useTopicsIds) {
+ new AlterPartitionResponseData.TopicData()
+ .setTopicId(controllerContext.topicIds.getOrElse(topicName,
Uuid.ZERO_UUID))
+ } else {
+ new AlterPartitionResponseData.TopicData()
+ .setTopicName(topicName)
+ }
+ alterPartitionResponse.topics.add(topicResponse)
+
+ partitionResponses.forKeyValue { (tp, errorOrIsr) =>
+ // Add each partition part to the response (new ISR or error)
+ errorOrIsr match {
+ case Left(error) =>
+ topicResponse.partitions.add(
+ new AlterPartitionResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(error.code))
+ case Right(leaderAndIsr) =>
+ /* Setting the LeaderRecoveryState field is always safe because it
will always be the same
+ * as the value set in the request. For version 0, that is always
the default RECOVERED
+ * which is ignored when serializing to version 0. For any other
version, the
+ * LeaderRecoveryState field is supported.
+ */
+ topicResponse.partitions.add(
+ new AlterPartitionResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setLeaderId(leaderAndIsr.leader)
+ .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+ .setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+ .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
+ .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+ )
}
}
-
- callback(alterPartitionResponse)
- } catch {
- case e: Throwable =>
- error(s"Error when processing AlterPartition for partitions:
${partitionsToAlter.keys.toSeq}", e)
- callback(new
AlterPartitionResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code))
- // We consider that all partitions have failed.
- partitionResponses.clear()
}
+ callback(alterPartitionResponse)
Review Comment:
It is hard to say because `maybeCompleteReassignment` call stack is pretty
deep. I have reworked this part to run the below code outside of the
try...catch. This ensure that the callback is called only once and only for
exceptions non related to the below code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]