dajac commented on code in PR #12240:
URL: https://github.com/apache/kafka/pull/12240#discussion_r889876056


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic,
             short replicationFactor = topic.replicationFactor() == -1 ?
                 defaultReplicationFactor : topic.replicationFactor();
             try {
-                List<List<Integer>> replicas = 
clusterControl.replicaPlacer().place(new PlacementSpec(
+                List<List<Integer>> partitions = 
clusterControl.replicaPlacer().place(new PlacementSpec(
                     0,
                     numPartitions,
                     replicationFactor
                 ), clusterDescriber);
-                for (int partitionId = 0; partitionId < replicas.size(); 
partitionId++) {
-                    int[] r = Replicas.toArray(replicas.get(partitionId));
+                for (int partitionId = 0; partitionId < partitions.size(); 
partitionId++) {
+                    List<Integer> replicas = partitions.get(partitionId);
+                    List<Integer> isr = replicas.stream().
+                        
filter(clusterControl::active).collect(Collectors.toList());
+                    // We need to have at least one replica in the ISR.
+                    if (isr.isEmpty()) isr.add(replicas.get(0));

Review Comment:
   That's a good point. Thinking more about this, I think that we should just 
fail the creation in this case. At the moment, the placer fails the assignment 
if all the brokers are fenced so extending this to include 
in-controlled-shutdown brokers seems appropriate.
   
   I still handle the case here. However, I think that we should update the 
placer to directly handle this. I would like to do this as a follow-up if you 
don't mind. That's a big change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to