cmccabe commented on a change in pull request #10070: URL: https://github.com/apache/kafka/pull/10070#discussion_r577167941
########## File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.common.config.ConfigDef.ConfigKey; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigResource.Type; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.metadata.ConfigRecord; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND; + +public class ConfigurationControlManager { + private final Logger log; + private final SnapshotRegistry snapshotRegistry; + private final Map<ConfigResource.Type, ConfigDef> configDefs; + private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData; + + ConfigurationControlManager(LogContext logContext, + SnapshotRegistry snapshotRegistry, + Map<ConfigResource.Type, ConfigDef> configDefs) { + this.log = logContext.logger(ConfigurationControlManager.class); + this.snapshotRegistry = snapshotRegistry; + this.configDefs = configDefs; + this.configData = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Determine the result of applying a batch of incremental configuration changes. Note + * that this method does not change the contents of memory. It just generates a + * result, that you can replay later if you wish using replay(). + * + * Note that there can only be one result per ConfigResource. So if you try to modify + * several keys and one modification fails, the whole ConfigKey fails and nothing gets + * changed. + * + * @param configChanges Maps each resource to a map from config keys to + * operation data. + * @return The result. + */ + ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs( + Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges) { + List<ApiMessageAndVersion> outputRecords = new ArrayList<>(); + Map<ConfigResource, ApiError> outputResults = new HashMap<>(); + for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry : + configChanges.entrySet()) { + incrementalAlterConfigResource(resourceEntry.getKey(), + resourceEntry.getValue(), + outputRecords, + outputResults); + } + return new ControllerResult<>(outputRecords, outputResults); + } + + private void incrementalAlterConfigResource(ConfigResource configResource, + Map<String, Entry<OpType, String>> keysToOps, + List<ApiMessageAndVersion> outputRecords, + Map<ConfigResource, ApiError> outputResults) { + ApiError error = checkConfigResource(configResource); + if (error.isFailure()) { + outputResults.put(configResource, error); + return; + } + List<ApiMessageAndVersion> newRecords = new ArrayList<>(); + for (Entry<String, Entry<OpType, String>> keysToOpsEntry : keysToOps.entrySet()) { + String key = keysToOpsEntry.getKey(); + String currentValue = null; + TimelineHashMap<String, String> currentConfigs = configData.get(configResource); + if (currentConfigs != null) { + currentValue = currentConfigs.get(key); + } + String newValue = currentValue; + Entry<OpType, String> opTypeAndNewValue = keysToOpsEntry.getValue(); + OpType opType = opTypeAndNewValue.getKey(); + String opValue = opTypeAndNewValue.getValue(); + switch (opType) { + case SET: + newValue = opValue; + break; + case DELETE: + if (opValue != null) { + outputResults.put(configResource, new ApiError( + Errors.INVALID_REQUEST, "A DELETE op was given with a " + + "non-null value.")); + return; + } + newValue = null; + break; + case APPEND: + case SUBTRACT: + if (!isSplittable(configResource.type(), key)) { + outputResults.put(configResource, new ApiError( + Errors.INVALID_CONFIG, "Can't " + opType + " to " + + "key " + key + " because its type is not LIST.")); + return; + } + List<String> newValueParts = getParts(newValue, key, configResource); + if (opType == APPEND) { + if (!newValueParts.contains(opValue)) { + newValueParts.add(opValue); + } + newValue = String.join(",", newValueParts); + } else if (newValueParts.remove(opValue)) { + newValue = String.join(",", newValueParts); + } + break; + } + if (!Objects.equals(currentValue, newValue)) { + newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). + setResourceType(configResource.type().id()). + setResourceName(configResource.name()). + setName(key). + setValue(newValue), (short) 0)); + } + } + outputRecords.addAll(newRecords); + outputResults.put(configResource, ApiError.NONE); + } + + /** + * Determine the result of applying a batch of legacy configuration changes. Note + * that this method does not change the contents of memory. It just generates a + * result, that you can replay later if you wish using replay(). + * + * @param newConfigs The new configurations to install for each resource. + * All existing configurations will be overwritten. + * @return The result. + */ + ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs( + Map<ConfigResource, Map<String, String>> newConfigs) { + List<ApiMessageAndVersion> outputRecords = new ArrayList<>(); + Map<ConfigResource, ApiError> outputResults = new HashMap<>(); + for (Entry<ConfigResource, Map<String, String>> resourceEntry : + newConfigs.entrySet()) { + legacyAlterConfigResource(resourceEntry.getKey(), + resourceEntry.getValue(), + outputRecords, + outputResults); + } + return new ControllerResult<>(outputRecords, outputResults); + } + + private void legacyAlterConfigResource(ConfigResource configResource, + Map<String, String> newConfigs, + List<ApiMessageAndVersion> outputRecords, + Map<ConfigResource, ApiError> outputResults) { + ApiError error = checkConfigResource(configResource); + if (error.isFailure()) { + outputResults.put(configResource, error); + return; + } + List<ApiMessageAndVersion> newRecords = new ArrayList<>(); + Map<String, String> currentConfigs = configData.get(configResource); + if (currentConfigs == null) { + currentConfigs = Collections.emptyMap(); + } + for (Entry<String, String> entry : newConfigs.entrySet()) { + String key = entry.getKey(); + String newValue = entry.getValue(); + String currentValue = currentConfigs.get(key); + if (!Objects.equals(newValue, currentValue)) { + newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). + setResourceType(configResource.type().id()). + setResourceName(configResource.name()). + setName(key). + setValue(newValue), (short) 0)); + } + } + for (String key : currentConfigs.keySet()) { + if (!newConfigs.containsKey(key)) { + newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). + setResourceType(configResource.type().id()). + setResourceName(configResource.name()). + setName(key). + setValue(null), (short) 0)); + } + } + outputRecords.addAll(newRecords); + outputResults.put(configResource, ApiError.NONE); + } + + private List<String> getParts(String value, String key, ConfigResource configResource) { + if (value == null) { + value = getConfigValueDefault(configResource.type(), key); + } + List<String> parts = new ArrayList<>(); + if (value == null) { + return parts; + } + String[] splitValues = value.split(","); + for (String splitValue : splitValues) { + if (!splitValue.isEmpty()) { + parts.add(splitValue); + } + } + return parts; + } + + static ApiError checkConfigResource(ConfigResource configResource) { + switch (configResource.type()) { + case BROKER_LOGGER: + // We do not handle resources of type BROKER_LOGGER in Review comment: This has to be handled by the individual brokers. It's not persisted anywhere currently (currently it is not stored in ZK I believe) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org