hachikuji commented on a change in pull request #9807: URL: https://github.com/apache/kafka/pull/9807#discussion_r552339407
########## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ########## @@ -1250,38 +1252,36 @@ object ReassignPartitionsCommand extends Logging { * Calculate the global map of all partitions that are moving. * * @param currentReassignments The currently active reassignments. - * @param proposedReassignments The proposed reassignments (destinations replicas only). + * @param proposedParts The proposed location of the partitions (destinations replicas only). * @param currentParts The current location of the partitions that we are * proposing to move. * @return A map from topic name to partition map. * The partition map is keyed on partition index and contains * the movements for that partition. */ def calculateProposedMoveMap(currentReassignments: Map[TopicPartition, PartitionReassignment], Review comment: It might be just me, but the logic in this method is as clear as mud to me. At a high level, we are just trying to identify the sources and the destinations to fill a `PartitionMove`. The destinations are the adding replicas (AR) and the sources are the current replicas (CR) without the adding replicas. The current logic below first calls `calculateCurrentMoveMap` which does the following: 1. Add each replica to both sources and destinations 2. Add all AR to destinations 3. Remove all removing replicas (RR) from destinations I think this computes the destinations correctly (even if it does so in an odd way), but sources will include all replicas, which is wrong. You have fixed this by removing AR from sources explicitly, but I think we can simplify a bit more. Maybe something like this ```scala move.destinations = reassignment.addingReplicas move.sources = reassignment.replicas - reassignment.addingReplicas ``` After computing the current move map, the current logic tries to account for the proposed reassignments. It does the following: 1. Overwrite current move with an empty move and set destinations to the target replicas (TR) 2. Iterate through all moves and check if sources is empty, which means we have overwritten the move in step 1. If so, then we add all of CR to sources. 3. Now we do one more pass over the moves and remove all of the destinations from sources. I have a hard time making sense of this logic. The main problem is that it assumes that TR are only destinations, but that is wrong. For example, if we reassign [1, 2] to [1, 3], then 1 should be a source, not a destination even though it is present in TR. I think the correct logic should be the following: 1. for each proposed assignment TR, check if there is already a reassignment in progress: 2. if there is a reassignment in progress, then we need to account for the current AR. We can compute sources as CR - AR, which is what we already did when calculating the current move map 3. if there is no reassignment in progress, then we set sources to CR 4. regardless of whether there is a reassignment in progress, we set destinations to TR - sources Something like this? ```scala proposedParts.foreach { case (part, targetReplicas) => val partMoves = moveMap.getOrElseUpdate(part.topic, new mutable.HashMap[Int, PartitionMove]) val sources = partMoves.get(part) match { case Some(move) => move.sources case None => currentParts.get(part) } val destinations = targetReplicas - sources partMoves.put(part.partition, PartitionMove(sources, destinations)) } ``` Check that over and see if it makes sense to you. Basically I think the current logic is unnecessarily complicated and probably wrong in multiple ways. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org