hachikuji commented on a change in pull request #9807:
URL: https://github.com/apache/kafka/pull/9807#discussion_r552339407



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -1250,38 +1252,36 @@ object ReassignPartitionsCommand extends Logging {
    * Calculate the global map of all partitions that are moving.
    *
    * @param currentReassignments    The currently active reassignments.
-   * @param proposedReassignments   The proposed reassignments (destinations 
replicas only).
+   * @param proposedParts           The proposed location of the partitions 
(destinations replicas only).
    * @param currentParts            The current location of the partitions 
that we are
    *                                proposing to move.
    * @return                        A map from topic name to partition map.
    *                                The partition map is keyed on partition 
index and contains
    *                                the movements for that partition.
    */
   def calculateProposedMoveMap(currentReassignments: Map[TopicPartition, 
PartitionReassignment],

Review comment:
       It might be just me, but the logic in this method is as clear as mud to 
me. At a high level, we are just trying to identify the sources and the 
destinations to fill a `PartitionMove`. The destinations are the adding 
replicas (AR) and the sources are the current replicas (CR) without the adding 
replicas. The current logic below first calls `calculateCurrentMoveMap` which 
does the following:
   
   1. Add each replica to both sources and destinations
   2. Add all AR to destinations 
   3. Remove all removing replicas (RR) from destinations
   
   I think this computes the destinations correctly (even if it does so in an 
odd way), but sources will include all replicas, which is wrong. You have fixed 
this by removing AR from sources explicitly, but I think we can simplify a bit 
more. Maybe something like this
   
   ```scala
   move.destinations = reassignment.addingReplicas
   move.sources = reassignment.replicas - reassignment.addingReplicas
   ```
   
   After computing the current move map, the current logic tries to account for 
the proposed reassignments. It does the following:
   
   1. Overwrite current move with an empty move and set destinations to the 
target replicas (TR)
   2. Iterate through all moves and check if sources is empty, which means we 
have overwritten the move in step 1. If so, then we add all of CR to sources.
   3. Now we do one more pass over the moves and remove all of the destinations 
from sources.
   
   I have a hard time making sense of this logic. The main problem is that it 
assumes that TR are only destinations, but that is wrong. For example, if we 
reassign [1, 2] to [1, 3], then 1 should be a source, not a destination even 
though it is present in TR. I think the correct logic should be the following:
   
   1. for each proposed assignment TR, check if there is already a reassignment 
in progress:
   2. if there is a reassignment in progress, then we need to account for the 
current AR. We can compute sources as CR - AR, which is what we already did 
when calculating the current move map
   3. if there is no reassignment in progress, then we set sources to CR
   4. regardless of whether there is a reassignment in progress, we set 
destinations to TR - sources
   
   Something like this?
   ```scala
   proposedParts.foreach {
     case (part, targetReplicas) =>
       val partMoves = moveMap.getOrElseUpdate(part.topic, new 
mutable.HashMap[Int, PartitionMove])
       val sources = partMoves.get(part) match {
         case Some(move) =>
           move.sources
         case None =>
           currentParts.get(part)
       }
       val destinations = targetReplicas - sources
       partMoves.put(part.partition, PartitionMove(sources, destinations))
   }
   ```
   
   Check that over and see if it makes sense to you. Basically I think the 
current logic is unnecessarily complicated and probably wrong in multiple ways.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to