cmccabe commented on a change in pull request #10070:
URL: https://github.com/apache/kafka/pull/10070#discussion_r577167941



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+
+public class ConfigurationControlManager {
+    private final Logger log;
+    private final SnapshotRegistry snapshotRegistry;
+    private final Map<ConfigResource.Type, ConfigDef> configDefs;
+    private final TimelineHashMap<ConfigResource, TimelineHashMap<String, 
String>> configData;
+
+    ConfigurationControlManager(LogContext logContext,
+                                SnapshotRegistry snapshotRegistry,
+                                Map<ConfigResource.Type, ConfigDef> 
configDefs) {
+        this.log = logContext.logger(ConfigurationControlManager.class);
+        this.snapshotRegistry = snapshotRegistry;
+        this.configDefs = configDefs;
+        this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
+    }
+
+    /**
+     * Determine the result of applying a batch of incremental configuration 
changes.  Note
+     * that this method does not change the contents of memory.  It just 
generates a
+     * result, that you can replay later if you wish using replay().
+     *
+     * Note that there can only be one result per ConfigResource.  So if you 
try to modify
+     * several keys and one modification fails, the whole ConfigKey fails and 
nothing gets
+     * changed.
+     *
+     * @param configChanges     Maps each resource to a map from config keys to
+     *                          operation data.
+     * @return                  The result.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+            Map<ConfigResource, Map<String, Entry<OpType, String>>> 
configChanges) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        Map<ConfigResource, ApiError> outputResults = new HashMap<>();
+        for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> 
resourceEntry :
+                configChanges.entrySet()) {
+            incrementalAlterConfigResource(resourceEntry.getKey(),
+                resourceEntry.getValue(),
+                outputRecords,
+                outputResults);
+        }
+        return new ControllerResult<>(outputRecords, outputResults);
+    }
+
+    private void incrementalAlterConfigResource(ConfigResource configResource,
+                                                Map<String, Entry<OpType, 
String>> keysToOps,
+                                                List<ApiMessageAndVersion> 
outputRecords,
+                                                Map<ConfigResource, ApiError> 
outputResults) {
+        ApiError error = checkConfigResource(configResource);
+        if (error.isFailure()) {
+            outputResults.put(configResource, error);
+            return;
+        }
+        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        for (Entry<String, Entry<OpType, String>> keysToOpsEntry : 
keysToOps.entrySet()) {
+            String key = keysToOpsEntry.getKey();
+            String currentValue = null;
+            TimelineHashMap<String, String> currentConfigs = 
configData.get(configResource);
+            if (currentConfigs != null) {
+                currentValue = currentConfigs.get(key);
+            }
+            String newValue = currentValue;
+            Entry<OpType, String> opTypeAndNewValue = 
keysToOpsEntry.getValue();
+            OpType opType = opTypeAndNewValue.getKey();
+            String opValue = opTypeAndNewValue.getValue();
+            switch (opType) {
+                case SET:
+                    newValue = opValue;
+                    break;
+                case DELETE:
+                    if (opValue != null) {
+                        outputResults.put(configResource, new ApiError(
+                            Errors.INVALID_REQUEST, "A DELETE op was given 
with a " +
+                            "non-null value."));
+                        return;
+                    }
+                    newValue = null;
+                    break;
+                case APPEND:
+                case SUBTRACT:
+                    if (!isSplittable(configResource.type(), key)) {
+                        outputResults.put(configResource, new ApiError(
+                            Errors.INVALID_CONFIG, "Can't " + opType + " to " +
+                            "key " + key + " because its type is not LIST."));
+                        return;
+                    }
+                    List<String> newValueParts = getParts(newValue, key, 
configResource);
+                    if (opType == APPEND) {
+                        if (!newValueParts.contains(opValue)) {
+                            newValueParts.add(opValue);
+                        }
+                        newValue = String.join(",", newValueParts);
+                    } else if (newValueParts.remove(opValue)) {
+                        newValue = String.join(",", newValueParts);
+                    }
+                    break;
+            }
+            if (!Objects.equals(currentValue, newValue)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(newValue), (short) 0));
+            }
+        }
+        outputRecords.addAll(newRecords);
+        outputResults.put(configResource, ApiError.NONE);
+    }
+
+    /**
+     * Determine the result of applying a batch of legacy configuration 
changes.  Note
+     * that this method does not change the contents of memory.  It just 
generates a
+     * result, that you can replay later if you wish using replay().
+     *
+     * @param newConfigs        The new configurations to install for each 
resource.
+     *                          All existing configurations will be 
overwritten.
+     * @return                  The result.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+        Map<ConfigResource, Map<String, String>> newConfigs) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        Map<ConfigResource, ApiError> outputResults = new HashMap<>();
+        for (Entry<ConfigResource, Map<String, String>> resourceEntry :
+            newConfigs.entrySet()) {
+            legacyAlterConfigResource(resourceEntry.getKey(),
+                resourceEntry.getValue(),
+                outputRecords,
+                outputResults);
+        }
+        return new ControllerResult<>(outputRecords, outputResults);
+    }
+
+    private void legacyAlterConfigResource(ConfigResource configResource,
+                                           Map<String, String> newConfigs,
+                                           List<ApiMessageAndVersion> 
outputRecords,
+                                           Map<ConfigResource, ApiError> 
outputResults) {
+        ApiError error = checkConfigResource(configResource);
+        if (error.isFailure()) {
+            outputResults.put(configResource, error);
+            return;
+        }
+        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        Map<String, String> currentConfigs = configData.get(configResource);
+        if (currentConfigs == null) {
+            currentConfigs = Collections.emptyMap();
+        }
+        for (Entry<String, String> entry : newConfigs.entrySet()) {
+            String key = entry.getKey();
+            String newValue = entry.getValue();
+            String currentValue = currentConfigs.get(key);
+            if (!Objects.equals(newValue, currentValue)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(newValue), (short) 0));
+            }
+        }
+        for (String key : currentConfigs.keySet()) {
+            if (!newConfigs.containsKey(key)) {
+                newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
+                    setResourceType(configResource.type().id()).
+                    setResourceName(configResource.name()).
+                    setName(key).
+                    setValue(null), (short) 0));
+            }
+        }
+        outputRecords.addAll(newRecords);
+        outputResults.put(configResource, ApiError.NONE);
+    }
+
+    private List<String> getParts(String value, String key, ConfigResource 
configResource) {
+        if (value == null) {
+            value = getConfigValueDefault(configResource.type(), key);
+        }
+        List<String> parts = new ArrayList<>();
+        if (value == null) {
+            return parts;
+        }
+        String[] splitValues = value.split(",");
+        for (String splitValue : splitValues) {
+            if (!splitValue.isEmpty()) {
+                parts.add(splitValue);
+            }
+        }
+        return parts;
+    }
+
+    static ApiError checkConfigResource(ConfigResource configResource) {
+        switch (configResource.type()) {
+            case BROKER_LOGGER:
+                // We do not handle resources of type BROKER_LOGGER in

Review comment:
       This has to be handled by the individual brokers.  It's not persisted 
anywhere currently (currently it is not stored in ZK I believe)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to