junrao commented on a change in pull request #10463: URL: https://github.com/apache/kafka/pull/10463#discussion_r612769595
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -832,13 +833,33 @@ void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) { throw new RuntimeException("Partition " + topicIdPartition + " existed in isrMembers, but not in the partitions map."); } - // TODO: if this partition is configured for unclean leader election, - // check the replica set rather than the ISR. - if (Replicas.contains(partition.isr, brokerId)) { - records.add(new ApiMessageAndVersion(new PartitionChangeRecord(). + boolean brokerInIsr = Replicas.contains(partition.isr, brokerId); + boolean shouldBecomeLeader; + if (configurationControl.shouldUseUncleanLeaderElection(topic.name)) { + shouldBecomeLeader = Replicas.contains(partition.replicas, brokerId); + } else { + shouldBecomeLeader = brokerInIsr; + } + if (shouldBecomeLeader) { + if (brokerInIsr) { + if (log.isDebugEnabled()) { + log.debug("The newly active node {} will be the leader for the " + + "previously offline partition {}.", + brokerId, topicIdPartition); + } + } else { + log.info("The newly active node {} will be the leader for the " + + "previously offline partition {}, after an UNCLEAN leader election.", + brokerId, topicIdPartition); + } + PartitionChangeRecord record = new PartitionChangeRecord(). setPartitionId(topicIdPartition.partitionId()). setTopicId(topic.id). - setLeader(brokerId), (short) 0)); + setLeader(brokerId); + if (!brokerInIsr) { + record.setIsr(Replicas.toList(partition.isr, brokerId)); Review comment: Hmm, if we performs an unclean leader election, the only replica in ISR should just be the new leader since the data in existing ISR is not guaranteed to match with the new leader. ########## File path: metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java ########## @@ -245,4 +246,32 @@ public void testLegacyAlterConfigs() { manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901"))))) ); } + + @Test + public void testShouldUseUncleanLeaderElection() throws Exception { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + ConfigurationControlManager manager = + new ConfigurationControlManager(new LogContext(), 0, snapshotRegistry, CONFIGS); + ControllerResult<Map<ConfigResource, ApiError>> result = manager.incrementalAlterConfigs( + toMap(entry(BROKER0, toMap( + entry(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, entry(SET, "true")))), + entry(MYTOPIC, toMap( + entry(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, entry(SET, "false")))))); + Map<ConfigResource, ApiError> expectedResponse = new HashMap<>(); + expectedResponse.put(BROKER0, ApiError.NONE); + expectedResponse.put(MYTOPIC, ApiError.NONE); + assertEquals(expectedResponse, result.response()); + ControllerTestUtils.replayAll(manager, result.records()); + assertTrue(manager.shouldUseUncleanLeaderElection(MYTOPIC.name())); Review comment: I think in this case, unclean leader election should be false for MYTOPIC since the topic level config takes precedence. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java ########## @@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) { configData.remove(new ConfigResource(Type.TOPIC, name)); } + private boolean getBoolean(ConfigResource configResource, String key) { + TimelineHashMap<String, String> map = configData.get(configResource); + if (map == null) return false; + String value = map.getOrDefault(key, "false"); + return value.equalsIgnoreCase("true"); + } + + /** + * Check if the given topic should use an unclean leader election. + * + * @param topicName The topic name. + * @return True if the controller or topic was configured to use unclean + * leader election. + */ + boolean shouldUseUncleanLeaderElection(String topicName) { + // Check the node config, cluster config, and topic config. + return getBoolean(currentNodeResource, UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) || Review comment: Hmm, the priority of a config is topic, node, node default. So, if the config is set at the topic level, we should just use the topic level setting and ignore the node level setting. If the config is not set at the topic level explicitly, then we move to the node level. If the node level is not set, then we move to the node default. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java ########## @@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) { configData.remove(new ConfigResource(Type.TOPIC, name)); } + private boolean getBoolean(ConfigResource configResource, String key) { + TimelineHashMap<String, String> map = configData.get(configResource); + if (map == null) return false; + String value = map.getOrDefault(key, "false"); + return value.equalsIgnoreCase("true"); + } + + /** + * Check if the given topic should use an unclean leader election. + * + * @param topicName The topic name. + * @return True if the controller or topic was configured to use unclean + * leader election. + */ + boolean shouldUseUncleanLeaderElection(String topicName) { Review comment: If unclean leader election is enabled through a config change, we need to trigger a leader election on all relevant partitions without a current leader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org