junrao commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r613366570



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ElectionStrategizer.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+class ElectionStrategizer {

Review comment:
       Could we add a comment on what the class does?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ElectionStrategizer.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+class ElectionStrategizer {
+    private static final Logger log = 
LoggerFactory.getLogger(ElectionStrategizer.class);
+
+    private final int nodeId;
+    private Boolean nodeUncleanConfig = null;
+    private Boolean clusterUncleanConfig = null;
+    private Function<String, String> topicUncleanConfigAccessor = __ -> 
"false";
+    private Map<String, String> topicUncleanOverrides = new HashMap<>();
+
+    ElectionStrategizer(int nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    ElectionStrategizer setNodeUncleanConfig(String nodeUncleanConfig) {
+        this.nodeUncleanConfig = parseBoolean("node", nodeUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setClusterUncleanConfig(String clusterUncleanConfig) {
+        this.clusterUncleanConfig = parseBoolean("cluster", 
clusterUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanConfigAccessor(
+            Function<String, String> topicUncleanConfigAccessor) {
+        this.topicUncleanConfigAccessor = topicUncleanConfigAccessor;
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanOverride(String topicName, String 
value) {
+        this.topicUncleanOverrides.put(topicName, value);
+        return this;
+    }
+
+    boolean shouldBeUnclean(String topicName) {
+        Boolean topicConfig = (topicUncleanOverrides.containsKey(topicName)) ?
+            parseBoolean("topic", topicUncleanOverrides.get(topicName)) :
+            parseBoolean("topic", topicUncleanConfigAccessor.apply(topicName));
+        if (topicConfig != null) return topicConfig.booleanValue();
+        if (nodeUncleanConfig != null) return nodeUncleanConfig.booleanValue();
+        if (clusterUncleanConfig != null) return 
clusterUncleanConfig.booleanValue();
+        return false;
+    }
+
+    // VisibleForTesting
+    Boolean parseBoolean(String what, String value) {

Review comment:
       Would it be better to return Optional to handle null more explicitly?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -783,7 +788,8 @@ void handleNodeDeactivated(int brokerId, 
List<ApiMessageAndVersion> records) {
                 record.setIsr(Replicas.toList(newIsr));
                 if (partition.leader == brokerId) {
                     // The fenced node will no longer be the leader.
-                    int newLeader = bestLeader(partition.replicas, newIsr, 
false);
+                    int newLeader = bestLeader(partition.replicas, newIsr,

Review comment:
       Hmm, how do we prevent the deacitved broker from being selected as the 
new broker? It seems that at this point, clusterControl hasn't reflected the 
fenceBrokerRecord yet.
   
   Also, in the caller handleBrokerFenced(), we add the partitionChangeRecord 
before the fenceBrokerRecord. Ordering wise, it seems that it's more natural to 
add the fenceBrokerRecord first. Then, it's clear that the 
partitionChangeRecord is the result of the fenceBrokerRecord.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ElectionStrategizer.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+class ElectionStrategizer {
+    private static final Logger log = 
LoggerFactory.getLogger(ElectionStrategizer.class);
+
+    private final int nodeId;
+    private Boolean nodeUncleanConfig = null;
+    private Boolean clusterUncleanConfig = null;
+    private Function<String, String> topicUncleanConfigAccessor = __ -> 
"false";
+    private Map<String, String> topicUncleanOverrides = new HashMap<>();
+
+    ElectionStrategizer(int nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    ElectionStrategizer setNodeUncleanConfig(String nodeUncleanConfig) {
+        this.nodeUncleanConfig = parseBoolean("node", nodeUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setClusterUncleanConfig(String clusterUncleanConfig) {
+        this.clusterUncleanConfig = parseBoolean("cluster", 
clusterUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanConfigAccessor(
+            Function<String, String> topicUncleanConfigAccessor) {
+        this.topicUncleanConfigAccessor = topicUncleanConfigAccessor;
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanOverride(String topicName, String 
value) {
+        this.topicUncleanOverrides.put(topicName, value);
+        return this;
+    }
+
+    boolean shouldBeUnclean(String topicName) {
+        Boolean topicConfig = (topicUncleanOverrides.containsKey(topicName)) ?
+            parseBoolean("topic", topicUncleanOverrides.get(topicName)) :
+            parseBoolean("topic", topicUncleanConfigAccessor.apply(topicName));
+        if (topicConfig != null) return topicConfig.booleanValue();
+        if (nodeUncleanConfig != null) return nodeUncleanConfig.booleanValue();
+        if (clusterUncleanConfig != null) return 
clusterUncleanConfig.booleanValue();
+        return false;
+    }
+
+    // VisibleForTesting
+    Boolean parseBoolean(String what, String value) {
+        if (value == null) return null;
+        if (value.equalsIgnoreCase("true")) return true;
+        if (value.equalsIgnoreCase("false")) return false;
+        if (value.trim().isEmpty()) return null;
+        log.warn("Invalid value for {} config {} on node {}: '{}'. Expected 
true or false.",

Review comment:
       If the boolean is set to an invalid value, it seems that we should send 
an error to the caller?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/Replicas.java
##########
@@ -43,6 +43,18 @@
         return list;
     }
 
+    /**
+     * Convert an array of integers to a list of ints and append a final 
element.
+     *
+     * @param array         The input array.
+     * @return              The output list.
+     */
+    public static List<Integer> toList(int[] array, int last) {

Review comment:
       It seems that this method is now only used in tests and can be removed?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -832,13 +839,35 @@ void handleNodeActivated(int brokerId, 
List<ApiMessageAndVersion> records) {
                 throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
-            // TODO: if this partition is configured for unclean leader 
election,
-            // check the replica set rather than the ISR.
-            if (Replicas.contains(partition.isr, brokerId)) {
-                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
-                    setPartitionId(topicIdPartition.partitionId()).
-                    setTopicId(topic.id).
-                    setLeader(brokerId), (short) 0));
+            if (strategizer.shouldBeUnclean(topic.name)) {
+                if (Replicas.contains(partition.replicas, brokerId)) {
+                    // Perform an unclean leader election.  The only entry in 
the new ISR
+                    // will be the current broker, since the data in the 
existing ISR is
+                    // not guaranteed to match with the new leader.
+                    log.info("The newly active node {} will be the leader for 
the " +
+                            "previously offline partition {}, after an UNCLEAN 
leader election.",
+                        brokerId, topicIdPartition);
+                    PartitionChangeRecord record = new PartitionChangeRecord().
+                        setPartitionId(topicIdPartition.partitionId()).
+                        setTopicId(topic.id).
+                        setLeader(brokerId).
+                        setIsr(Collections.singletonList(brokerId));
+                    records.add(new ApiMessageAndVersion(record, (short) 0));
+                }
+            } else {
+                if (Replicas.contains(partition.isr, brokerId)) {
+                    // Perform a clean leader election.
+                    if (log.isDebugEnabled()) {
+                        log.debug("The newly active node {} will be the leader 
for the " +
+                                "previously offline partition {}.",
+                            brokerId, topicIdPartition);
+                    }
+                    PartitionChangeRecord record = new PartitionChangeRecord().
+                        setPartitionId(topicIdPartition.partitionId()).
+                        setTopicId(topic.id).
+                        setLeader(brokerId);

Review comment:
       Should we set the ISR for this partition too?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -783,7 +788,8 @@ void handleNodeDeactivated(int brokerId, 
List<ApiMessageAndVersion> records) {
                 record.setIsr(Replicas.toList(newIsr));
                 if (partition.leader == brokerId) {
                     // The fenced node will no longer be the leader.
-                    int newLeader = bestLeader(partition.replicas, newIsr, 
false);
+                    int newLeader = bestLeader(partition.replicas, newIsr,
+                        strategizer.shouldBeUnclean(topic.name));

Review comment:
       If there is an unclean leader election, we need to reset the ISR to just 
the leader. Since this needs to be done in multiple places, perhaps we could 
change bestLeader() to return both the new leader and the isr.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +389,20 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    ConfigResource currentNodeResource() {

Review comment:
       Could this be private?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -832,13 +839,35 @@ void handleNodeActivated(int brokerId, 
List<ApiMessageAndVersion> records) {
                 throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
-            // TODO: if this partition is configured for unclean leader 
election,
-            // check the replica set rather than the ISR.
-            if (Replicas.contains(partition.isr, brokerId)) {
-                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
-                    setPartitionId(topicIdPartition.partitionId()).
-                    setTopicId(topic.id).
-                    setLeader(brokerId), (short) 0));
+            if (strategizer.shouldBeUnclean(topic.name)) {
+                if (Replicas.contains(partition.replicas, brokerId)) {
+                    // Perform an unclean leader election.  The only entry in 
the new ISR
+                    // will be the current broker, since the data in the 
existing ISR is
+                    // not guaranteed to match with the new leader.
+                    log.info("The newly active node {} will be the leader for 
the " +
+                            "previously offline partition {}, after an UNCLEAN 
leader election.",
+                        brokerId, topicIdPartition);
+                    PartitionChangeRecord record = new PartitionChangeRecord().
+                        setPartitionId(topicIdPartition.partitionId()).
+                        setTopicId(topic.id).
+                        setLeader(brokerId).
+                        setIsr(Collections.singletonList(brokerId));
+                    records.add(new ApiMessageAndVersion(record, (short) 0));
+                }
+            } else {
+                if (Replicas.contains(partition.isr, brokerId)) {

Review comment:
       We should prefer the clean leader election even if shouldBeUnclean is 
true.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -42,23 +42,38 @@
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Function;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static 
org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
 
 
 public class ConfigurationControlManager {
+    static final ConfigResource DEFAULT_NODE_RESOURCE = new 
ConfigResource(Type.BROKER, "");
+
     private final Logger log;
+    private final int nodeId;
+    private final ConfigResource currentNodeResource;
     private final SnapshotRegistry snapshotRegistry;
     private final Map<ConfigResource.Type, ConfigDef> configDefs;
+    private final TimelineHashMap<String, String> emptyMap;

Review comment:
       Could this be a static val?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1148,113 @@ void validateManualPartitionAssignment(List<Integer> 
assignment,
         }
     }
 
+    /**
+     * Handle legacy configuration alterations.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+            Map<ConfigResource, Map<String, String>> newConfigs) {
+        ControllerResult<Map<ConfigResource, ApiError>> result =
+            configurationControl.legacyAlterConfigs(newConfigs);
+        return alterConfigs(result);
+    }
+
+    /**
+     * Handle incremental configuration alterations.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+            Map<ConfigResource, Map<String, Entry<OpType, String>>> 
configChanges) {
+        ControllerResult<Map<ConfigResource, ApiError>> result =
+            configurationControl.incrementalAlterConfigs(configChanges);
+        return alterConfigs(result);
+    }
+
+    /**
+     * If important controller configurations were changed, generate records 
which will
+     * apply the changes.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> alterConfigs(
+            ControllerResult<Map<ConfigResource, ApiError>> result) {
+        ElectionStrategizer strategizer = 
examineConfigAlterations(result.records());
+        boolean isAtomic = true;
+        List<ApiMessageAndVersion> records = result.records();
+        if (strategizer != null) {
+            records.addAll(handleLeaderElectionConfigChanges(strategizer));
+            isAtomic = false;

Review comment:
       The partitionRecord from unclean leader election doesn't need to be 
applied atomically. However, it seems the config records still need to be 
applied atomically?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to