dajac commented on code in PR #12240: URL: https://github.com/apache/kafka/pull/12240#discussion_r889876056
########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic, short replicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor(); try { - List<List<Integer>> replicas = clusterControl.replicaPlacer().place(new PlacementSpec( + List<List<Integer>> partitions = clusterControl.replicaPlacer().place(new PlacementSpec( 0, numPartitions, replicationFactor ), clusterDescriber); - for (int partitionId = 0; partitionId < replicas.size(); partitionId++) { - int[] r = Replicas.toArray(replicas.get(partitionId)); + for (int partitionId = 0; partitionId < partitions.size(); partitionId++) { + List<Integer> replicas = partitions.get(partitionId); + List<Integer> isr = replicas.stream(). + filter(clusterControl::active).collect(Collectors.toList()); + // We need to have at least one replica in the ISR. + if (isr.isEmpty()) isr.add(replicas.get(0)); Review Comment: That's a good point. Thinking more about this, I think that we should just fail the creation in this case. At the moment, the placer fails the assignment if all the brokers are fenced so extending this to include in-controlled-shutdown brokers seems appropriate. I still handle the case here. However, I think that we should update the placer to directly handle this. I would like to do this as a follow-up if you don't mind. That's a big change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org