[ https://issues.apache.org/jira/browse/KAFKA-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495724#comment-16495724 ]
ASF GitHub Bot commented on KAFKA-6886: --------------------------------------- ewencp closed pull request #5068: KAFKA-6886 Externalize secrets from Connect configs URL: https://github.com/apache/kafka/pull/5068 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ba48c38cb28..5bf69b6b65f 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -83,7 +83,7 @@ files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/> <suppress checks="ParameterNumber" - files="WorkerSourceTask.java"/> + files="(WorkerSinkTask|WorkerSourceTask).java"/> <suppress checks="ParameterNumber" files="WorkerCoordinator.java"/> <suppress checks="ParameterNumber" diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java new file mode 100644 index 00000000000..d4c9948bc93 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config; + +/** + * A callback passed to {@link ConfigProvider} for subscribing to changes. + */ +public interface ConfigChangeCallback { + + /** + * Performs an action when configuration data changes. + * + * @param path the path at which the data resides + * @param data the configuration data + */ + void onChange(String path, ConfigData data); +} diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java new file mode 100644 index 00000000000..2bd0ff6b06a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config; + +import java.util.Map; + +/** + * Configuration data from a {@link ConfigProvider}. + */ +public class ConfigData { + + private final Map<String, String> data; + private final Long ttl; + + /** + * Creates a new ConfigData with the given data and TTL (in milliseconds). + * + * @param data a Map of key-value pairs + * @param ttl the time-to-live of the data in milliseconds, or null if there is no TTL + */ + public ConfigData(Map<String, String> data, Long ttl) { + this.data = data; + this.ttl = ttl; + } + + /** + * Creates a new ConfigData with the given data. + * + * @param data a Map of key-value pairs + */ + public ConfigData(Map<String, String> data) { + this(data, null); + } + + /** + * Returns the data. + * + * @return data a Map of key-value pairs + */ + public Map<String, String> data() { + return data; + } + + /** + * Returns the TTL (in milliseconds). + * + * @return ttl the time-to-live (in milliseconds) of the data, or null if there is no TTL + */ + public Long ttl() { + return ttl; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java new file mode 100644 index 00000000000..7133baaebd0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config; + +import org.apache.kafka.common.Configurable; + +import java.io.Closeable; +import java.util.Set; + +/** + * A provider of configuration data, which may optionally support subscriptions to configuration changes. + */ +public interface ConfigProvider extends Configurable, Closeable { + + /** + * Retrieves the data at the given path. + * + * @param path the path where the data resides + * @return the configuration data + */ + ConfigData get(String path); + + /** + * Retrieves the data with the given keys at the given path. + * + * @param path the path where the data resides + * @param keys the keys whose values will be retrieved + * @return the configuration data + */ + ConfigData get(String path, Set<String> keys); + + /** + * Subscribes to changes for the given keys at the given path (optional operation). + * + * @param path the path where the data resides + * @param keys the keys whose values will be retrieved + * @param callback the callback to invoke upon change + * @throws {@link UnsupportedOperationException} if the subscribe operation is not supported + */ + default void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) { + throw new UnsupportedOperationException(); + } + + /** + * Unsubscribes to changes for the given keys at the given path (optional operation). + * + * @param path the path where the data resides + * @param keys the keys whose values will be retrieved + * @param callback the callback to be unsubscribed from changes + * @throws {@link UnsupportedOperationException} if the unsubscribe operation is not supported + */ + default void unsubscribe(String path, Set<String> keys, ConfigChangeCallback callback) { + throw new UnsupportedOperationException(); + } + + /** + * Clears all subscribers (optional operation). + * + * @throws {@link UnsupportedOperationException} if the unsubscribeAll operation is not supported + */ + default void unsubscribeAll() { + throw new UnsupportedOperationException(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java new file mode 100644 index 00000000000..7c3c516b073 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This class wraps a set of {@link ConfigProvider} instances and uses them to perform + * transformations. + * + * <p>The default variable pattern is of the form <code>${provider:[path:]key}</code>, + * where the <code>provider</code> corresponds to a {@link ConfigProvider} instance, as passed to + * {@link ConfigTransformer#ConfigTransformer(Map)}. The pattern will extract a set + * of paths (which are optional) and keys and then pass them to {@link ConfigProvider#get(String, Set)} to obtain the + * values with which to replace the variables. + * + * <p>For example, if a Map consisting of an entry with a provider name "file" and provider instance + * {@link FileConfigProvider} is passed to the {@link ConfigTransformer#ConfigTransformer(Map)}, and a Properties + * file with contents + * <pre> + * fileKey=someValue + * </pre> + * resides at the path "/tmp/properties.txt", then when a configuration Map which has an entry with a key "someKey" and + * a value "${file:/tmp/properties.txt:fileKey}" is passed to the {@link #transform(Map)} method, then the transformed + * Map will have an entry with key "someKey" and a value "someValue". + * + * <p>This class only depends on {@link ConfigProvider#get(String, Set)} and does not depend on subscription support + * in a {@link ConfigProvider}, such as the {@link ConfigProvider#subscribe(String, Set, ConfigChangeCallback)} and + * {@link ConfigProvider#unsubscribe(String, Set, ConfigChangeCallback)} methods. + */ +public class ConfigTransformer { + private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}"); + private static final String EMPTY_PATH = ""; + + private final Map<String, ConfigProvider> configProviders; + + /** + * Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>. + * + * @param configProviders a Map of provider names and {@link ConfigProvider} instances. + */ + public ConfigTransformer(Map<String, ConfigProvider> configProviders) { + this.configProviders = configProviders; + } + + /** + * Transforms the given configuration data by using the {@link ConfigProvider} instances to + * look up values to replace the variables in the pattern. + * + * @param configs the configuration values to be transformed + * @return an instance of {@link ConfigTransformerResult} + */ + public ConfigTransformerResult transform(Map<String, String> configs) { + Map<String, Map<String, Set<String>>> keysByProvider = new HashMap<>(); + Map<String, Map<String, Map<String, String>>> lookupsByProvider = new HashMap<>(); + + // Collect the variables from the given configs that need transformation + for (Map.Entry<String, String> config : configs.entrySet()) { + List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), DEFAULT_PATTERN); + for (ConfigVariable var : vars) { + Map<String, Set<String>> keysByPath = keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>()); + Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> new HashSet<>()); + keys.add(var.variable); + } + } + + // Retrieve requested variables from the ConfigProviders + Map<String, Long> ttls = new HashMap<>(); + for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) { + String providerName = entry.getKey(); + ConfigProvider provider = configProviders.get(providerName); + Map<String, Set<String>> keysByPath = entry.getValue(); + if (provider != null && keysByPath != null) { + for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) { + String path = pathWithKeys.getKey(); + Set<String> keys = new HashSet<>(pathWithKeys.getValue()); + ConfigData configData = provider.get(path, keys); + Map<String, String> data = configData.data(); + Long ttl = configData.ttl(); + if (ttl != null && ttl >= 0) { + ttls.put(path, ttl); + } + Map<String, Map<String, String>> keyValuesByPath = + lookupsByProvider.computeIfAbsent(providerName, k -> new HashMap<>()); + keyValuesByPath.put(path, data); + } + } + } + + // Perform the transformations by performing variable replacements + Map<String, String> data = new HashMap<>(configs); + for (Map.Entry<String, String> config : configs.entrySet()) { + data.put(config.getKey(), replace(lookupsByProvider, config.getValue(), DEFAULT_PATTERN)); + } + return new ConfigTransformerResult(data, ttls); + } + + private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) { + List<ConfigVariable> configVars = new ArrayList<>(); + Matcher matcher = pattern.matcher(value); + while (matcher.find()) { + configVars.add(new ConfigVariable(matcher)); + } + return configVars; + } + + private static String replace(Map<String, Map<String, Map<String, String>>> lookupsByProvider, + String value, + Pattern pattern) { + Matcher matcher = pattern.matcher(value); + StringBuilder builder = new StringBuilder(); + int i = 0; + while (matcher.find()) { + ConfigVariable configVar = new ConfigVariable(matcher); + Map<String, Map<String, String>> lookupsByPath = lookupsByProvider.get(configVar.providerName); + if (lookupsByPath != null) { + Map<String, String> keyValues = lookupsByPath.get(configVar.path); + String replacement = keyValues.get(configVar.variable); + builder.append(value, i, matcher.start()); + if (replacement == null) { + // No replacements will be performed; just return the original value + builder.append(matcher.group(0)); + } else { + builder.append(replacement); + } + i = matcher.end(); + } + } + builder.append(value, i, value.length()); + return builder.toString(); + } + + private static class ConfigVariable { + final String providerName; + final String path; + final String variable; + + ConfigVariable(Matcher matcher) { + this.providerName = matcher.group(1); + this.path = matcher.group(3) != null ? matcher.group(3) : EMPTY_PATH; + this.variable = matcher.group(4); + } + + public String toString() { + return "(" + providerName + ":" + (path != null ? path + ":" : "") + variable + ")"; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java new file mode 100644 index 00000000000..df7bea62f37 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config; + +import java.util.Map; + +/** + * The result of a transformation from {@link ConfigTransformer}. + */ +public class ConfigTransformerResult { + + private Map<String, Long> ttls; + private Map<String, String> data; + + /** + * Creates a new ConfigTransformerResult with the given data and TTL values for a set of paths. + * + * @param data a Map of key-value pairs + * @param ttls a Map of path and TTL values (in milliseconds) + */ + public ConfigTransformerResult(Map<String, String> data, Map<String, Long> ttls) { + this.data = data; + this.ttls = ttls; + } + + /** + * Returns the transformed data, with variables replaced with corresponding values from the + * ConfigProvider instances if found. + * + * <p>Modifying the transformed data that is returned does not affect the {@link ConfigProvider} nor the + * original data that was used as the source of the transformation. + * + * @return data a Map of key-value pairs + */ + public Map<String, String> data() { + return data; + } + + /** + * Returns the TTL values (in milliseconds) returned from the ConfigProvider instances for a given set of paths. + * + * @return data a Map of path and TTL values + */ + public Map<String, Long> ttls() { + return ttls; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java new file mode 100644 index 00000000000..fefc93566f3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * An implementation of {@link ConfigProvider} that represents a Properties file. + * All property keys and values are stored as cleartext. + */ +public class FileConfigProvider implements ConfigProvider { + + public void configure(Map<String, ?> configs) { + } + + /** + * Retrieves the data at the given Properties file. + * + * @param path the file where the data resides + * @return the configuration data + */ + public ConfigData get(String path) { + Map<String, String> data = new HashMap<>(); + if (path == null || path.isEmpty()) { + return new ConfigData(data); + } + try (Reader reader = reader(path)) { + Properties properties = new Properties(); + properties.load(reader); + Enumeration<Object> keys = properties.keys(); + while (keys.hasMoreElements()) { + String key = keys.nextElement().toString(); + String value = properties.getProperty(key); + if (value != null) { + data.put(key, value); + } + } + return new ConfigData(data); + } catch (IOException e) { + throw new ConfigException("Could not read properties from file " + path); + } + } + + /** + * Retrieves the data with the given keys at the given Properties file. + * + * @param path the file where the data resides + * @param keys the keys whose values will be retrieved + * @return the configuration data + */ + public ConfigData get(String path, Set<String> keys) { + Map<String, String> data = new HashMap<>(); + if (path == null || path.isEmpty()) { + return new ConfigData(data); + } + try (Reader reader = reader(path)) { + Properties properties = new Properties(); + properties.load(reader); + for (String key : keys) { + String value = properties.getProperty(key); + if (value != null) { + data.put(key, value); + } + } + return new ConfigData(data); + } catch (IOException e) { + throw new ConfigException("Could not read properties from file " + path); + } + } + + // visible for testing + protected Reader reader(String path) throws IOException { + return new InputStreamReader(new FileInputStream(path), StandardCharsets.UTF_8); + } + + public void close() { + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java new file mode 100644 index 00000000000..7bc74f36e9b --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ConfigTransformerTest { + + public static final String MY_KEY = "myKey"; + public static final String TEST_INDIRECTION = "testIndirection"; + public static final String TEST_KEY = "testKey"; + public static final String TEST_KEY_WITH_TTL = "testKeyWithTTL"; + public static final String TEST_PATH = "testPath"; + public static final String TEST_RESULT = "testResult"; + public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL"; + + private ConfigTransformer configTransformer; + + @Before + public void setup() { + configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider())); + } + + @Test + public void testReplaceVariable() throws Exception { + ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}")); + Map<String, String> data = result.data(); + Map<String, Long> ttls = result.ttls(); + assertEquals(TEST_RESULT, data.get(MY_KEY)); + assertTrue(ttls.isEmpty()); + } + + @Test + public void testReplaceVariableWithTTL() throws Exception { + ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + Map<String, String> data = result.data(); + Map<String, Long> ttls = result.ttls(); + assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY)); + assertEquals(1L, ttls.get(TEST_PATH).longValue()); + } + + @Test + public void testReplaceMultipleVariablesInValue() throws Exception { + ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!")); + Map<String, String> data = result.data(); + assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY)); + } + + @Test + public void testNoReplacement() throws Exception { + ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}")); + Map<String, String> data = result.data(); + assertEquals("${test:testPath:missingKey}", data.get(MY_KEY)); + } + + @Test + public void testSingleLevelOfIndirection() throws Exception { + ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}")); + Map<String, String> data = result.data(); + assertEquals("${test:testPath:testResult}", data.get(MY_KEY)); + } + + public static class TestConfigProvider implements ConfigProvider { + + public void configure(Map<String, ?> configs) { + } + + public ConfigData get(String path) { + return null; + } + + public ConfigData get(String path, Set<String> keys) { + Map<String, String> data = new HashMap<>(); + Long ttl = null; + if (path.equals(TEST_PATH)) { + if (keys.contains(TEST_KEY)) { + data.put(TEST_KEY, TEST_RESULT); + } + if (keys.contains(TEST_KEY_WITH_TTL)) { + data.put(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL); + ttl = 1L; + } + if (keys.contains(TEST_INDIRECTION)) { + data.put(TEST_INDIRECTION, "${test:testPath:testResult}"); + } + } + return new ConfigData(data, ttl); + } + + public void close() { + } + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java b/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java new file mode 100644 index 00000000000..9157e380456 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config; + +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class FileConfigProviderTest { + + private FileConfigProvider configProvider; + + @Before + public void setup() { + configProvider = new TestFileConfigProvider(); + } + + @Test + public void testGetAllKeysAtPath() throws Exception { + ConfigData configData = configProvider.get("dummy"); + Map<String, String> result = new HashMap<>(); + result.put("testKey", "testResult"); + result.put("testKey2", "testResult2"); + assertEquals(result, configData.data()); + assertEquals(null, configData.ttl()); + } + + @Test + public void testGetOneKeyAtPath() throws Exception { + ConfigData configData = configProvider.get("dummy", Collections.singleton("testKey")); + Map<String, String> result = new HashMap<>(); + result.put("testKey", "testResult"); + assertEquals(result, configData.data()); + assertEquals(null, configData.ttl()); + } + + @Test + public void testEmptyPath() throws Exception { + ConfigData configData = configProvider.get("", Collections.singleton("testKey")); + assertTrue(configData.data().isEmpty()); + assertEquals(null, configData.ttl()); + } + + @Test + public void testEmptyPathWithKey() throws Exception { + ConfigData configData = configProvider.get(""); + assertTrue(configData.data().isEmpty()); + assertEquals(null, configData.ttl()); + } + + @Test + public void testNullPath() throws Exception { + ConfigData configData = configProvider.get(null); + assertTrue(configData.data().isEmpty()); + assertEquals(null, configData.ttl()); + } + + @Test + public void testNullPathWithKey() throws Exception { + ConfigData configData = configProvider.get(null, Collections.singleton("testKey")); + assertTrue(configData.data().isEmpty()); + assertEquals(null, configData.ttl()); + } + + public static class TestFileConfigProvider extends FileConfigProvider { + + @Override + protected Reader reader(String path) throws IOException { + return new StringReader("testKey=testResult\ntestKey2=testResult2"); + } + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java index 1e214be4b8c..340ef804852 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java @@ -25,6 +25,16 @@ * Context passed to SinkTasks, allowing them to access utilities in the Kafka Connect runtime. */ public interface SinkTaskContext { + + /** + * Get the Task configuration. This is the latest configuration and may differ from that passed on startup. + * + * For example, this method can be used to obtain the latest configuration if an external secret has changed, + * and the configuration is using variable references such as those compatible with + * {@link org.apache.kafka.common.config.ConfigTransformer}. + */ + public Map<String, String> configs(); + /** * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java index 8eec1dfb138..2e87986648f 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java @@ -18,11 +18,22 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; +import java.util.Map; + /** * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying * runtime. */ public interface SourceTaskContext { + /** + * Get the Task configuration. This is the latest configuration and may differ from that passed on startup. + * + * For example, this method can be used to obtain the latest configuration if an external secret has changed, + * and the configuration is using variable references such as those compatible with + * {@link org.apache.kafka.common.config.ConfigTransformer}. + */ + public Map<String, String> configs(); + /** * Get the OffsetStorageReader for this SourceTask. */ diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 54854fe4b80..f8c15de8ef4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.Connect; import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.WorkerInfo; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.distributed.DistributedHerder; @@ -85,12 +86,16 @@ public static void main(String[] args) throws Exception { offsetBackingStore.configure(config); Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); + WorkerConfigTransformer configTransformer = worker.configTransformer(); Converter internalValueConverter = worker.getInternalValueConverter(); StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); statusBackingStore.configure(config); - ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config); + ConfigBackingStore configBackingStore = new KafkaConfigBackingStore( + internalValueConverter, + config, + configTransformer); DistributedHerder herder = new DistributedHerder(config, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index c31568664fc..b5e0ec2c07b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -91,6 +91,7 @@ public AbstractHerder(Worker worker, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore) { this.worker = worker; + this.worker.herder = this; this.workerId = workerId; this.kafkaClusterId = kafkaClusterId; this.statusBackingStore = statusBackingStore; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index a8dd49a4091..c54c160d5ab 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -38,6 +38,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; +import static org.apache.kafka.common.config.ConfigDef.ValidString.in; /** * <p> @@ -91,6 +92,20 @@ private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records."; private static final String TRANSFORMS_DISPLAY = "Transforms"; + public static final String CONFIG_RELOAD_ACTION_CONFIG = "config.action.reload"; + private static final String CONFIG_RELOAD_ACTION_DOC = + "The action that Connect should take on the connector when changes in external " + + "configuration providers result in a change in the connector's configuration properties. " + + "A value of 'none' indicates that Connect will do nothing. " + + "A value of 'restart' indicates that Connect should restart/reload the connector with the " + + "updated configuration properties." + + "The restart may actually be scheduled in the future if the external configuration provider " + + "indicates that a configuration value will expire in the future."; + + private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action"; + public static final String CONFIG_RELOAD_ACTION_NONE = Herder.ConfigReloadAction.NONE.toString(); + public static final String CONFIG_RELOAD_ACTION_RESTART = Herder.ConfigReloadAction.RESTART.toString(); + private final EnrichedConnectorConfig enrichedConfig; private static class EnrichedConnectorConfig extends AbstractConfig { EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) { @@ -120,7 +135,10 @@ public void ensureValid(String name, Object value) { throw new ConfigException(name, value, "Duplicate alias provided."); } } - }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY); + }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY) + .define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART, + in(CONFIG_RELOAD_ACTION_NONE, CONFIG_RELOAD_ACTION_RESTART), Importance.LOW, + CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY); } public ConnectorConfig(Plugins plugins) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 855b08a8a7f..5c7cc1429aa 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -148,6 +148,12 @@ */ void restartTask(ConnectorTaskId id, Callback<Void> cb); + /** + * Get the configuration reload action. + * @param connName name of the connector + */ + ConfigReloadAction connectorConfigReloadAction(final String connName); + /** * Restart the connector. * @param connName name of the connector @@ -155,6 +161,15 @@ */ void restartConnector(String connName, Callback<Void> cb); + /** + * Restart the connector. + * @param delayMs delay before restart + * @param connName name of the connector + * @param cb callback to invoke upon completion + * @returns The id of the request + */ + HerderRequest restartConnector(long delayMs, String connName, Callback<Void> cb); + /** * Pause the connector. This call will asynchronously suspend processing by the connector and all * of its tasks. @@ -183,6 +198,11 @@ */ String kafkaClusterId(); + enum ConfigReloadAction { + NONE, + RESTART + } + class Created<T> { private final boolean created; private final T result; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java new file mode 100644 index 00000000000..627da4df823 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +public interface HerderRequest { + void cancel(); +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index c58eddfb2f7..7a72a0e7b26 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.config.ConfigProvider; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Frequencies; import org.apache.kafka.common.metrics.stats.Total; @@ -30,6 +31,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.ErrorReporter; @@ -76,6 +78,7 @@ public class Worker { private static final Logger log = LoggerFactory.getLogger(Worker.class); + protected Herder herder; private final ExecutorService executor; private final Time time; private final String workerId; @@ -91,6 +94,7 @@ private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>(); private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>(); private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; + private WorkerConfigTransformer workerConfigTransformer; public Worker( String workerId, @@ -122,6 +126,8 @@ public Worker( this.offsetBackingStore = offsetBackingStore; this.offsetBackingStore.configure(config); + this.workerConfigTransformer = initConfigTransformer(); + producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); @@ -137,6 +143,28 @@ public Worker( producerProps.putAll(config.originalsWithPrefix("producer.")); } + private WorkerConfigTransformer initConfigTransformer() { + final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG); + Map<String, ConfigProvider> providerMap = new HashMap<>(); + for (String providerName : providerNames) { + ConfigProvider configProvider = plugins.newConfigProvider( + config, + WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName, + ClassLoaderUsage.PLUGINS + ); + providerMap.put(providerName, configProvider); + } + return new WorkerConfigTransformer(this, providerMap); + } + + public WorkerConfigTransformer configTransformer() { + return workerConfigTransformer; + } + + protected Herder herder() { + return herder; + } + /** * Start worker. */ @@ -359,6 +387,7 @@ public boolean isRunning(String connName) { */ public boolean startTask( ConnectorTaskId id, + ClusterConfigState configState, Map<String, String> connProps, Map<String, String> taskProps, TaskStatus.Listener statusListener, @@ -419,7 +448,7 @@ public boolean startTask( log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id); } - workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader); + workerTask = buildWorkerTask(configState, connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader); workerTask.initialize(taskConfig); Plugins.compareAndSwapLoaders(savedLoader); } catch (Throwable t) { @@ -444,7 +473,8 @@ public boolean startTask( return true; } - private WorkerTask buildWorkerTask(ConnectorConfig connConfig, + private WorkerTask buildWorkerTask(ClusterConfigState configState, + ConnectorConfig connConfig, ConnectorTaskId id, Task task, TaskStatus.Listener statusListener, @@ -469,13 +499,14 @@ private WorkerTask buildWorkerTask(ConnectorConfig connConfig, internalKeyConverter, internalValueConverter); KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + // Note we pass the configState as it performs dynamic transformations under the covers return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, - headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, + headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator); } else if (task instanceof SinkTask) { TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator); retryWithToleranceOperator.reporters(sinkTaskReporters(id, connConfig, errorHandlingMetrics)); - return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter, + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, valueConverter, headerConverter, transformationChain, loader, time, retryWithToleranceOperator); } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 3c76d0fa5f2..355cfbb615b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -190,6 +191,11 @@ + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins," + "/opt/connectors"; + public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; + protected static final String CONFIG_PROVIDERS_DOC = "List of configuration providers. " + + "This is a comma-separated list of the fully-qualified names of the ConfigProvider implementations, " + + "in the order they will be created, configured, and used."; + public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes"; protected static final String REST_EXTENSION_CLASSES_DOC = "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called " @@ -262,6 +268,9 @@ protected static ConfigDef baseConfigDef() { .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC) + .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, + Collections.emptyList(), + Importance.LOW, CONFIG_PROVIDERS_DOC) .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, REST_EXTENSION_CLASSES_DOC); } @@ -334,4 +343,5 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) { super(definition, props); logInternalConverterDeprecationWarnings(props); } + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java new file mode 100644 index 00000000000..d91411cb8e8 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigProvider; +import org.apache.kafka.common.config.ConfigTransformer; +import org.apache.kafka.common.config.ConfigTransformerResult; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * A wrapper class to perform configuration transformations and schedule reloads for any + * retrieved TTL values. + */ +public class WorkerConfigTransformer { + private final Worker worker; + private final ConfigTransformer configTransformer; + private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>(); + + public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) { + this.worker = worker; + this.configTransformer = new ConfigTransformer(configProviders); + } + + public Map<String, String> transform(String connectorName, Map<String, String> configs) { + ConfigTransformerResult result = configTransformer.transform(configs); + scheduleReload(connectorName, result.ttls()); + return result.data(); + } + + private void scheduleReload(String connectorName, Map<String, Long> ttls) { + for (Map.Entry<String, Long> entry : ttls.entrySet()) { + scheduleReload(connectorName, entry.getKey(), entry.getValue()); + } + } + + private void scheduleReload(String connectorName, String path, long ttl) { + Herder herder = worker.herder(); + if (herder.connectorConfigReloadAction(connectorName) == Herder.ConfigReloadAction.RESTART) { + Map<String, HerderRequest> connectorRequests = requests.get(connectorName); + if (connectorRequests == null) { + connectorRequests = new ConcurrentHashMap<>(); + requests.put(connectorName, connectorRequests); + } else { + HerderRequest previousRequest = connectorRequests.get(path); + if (previousRequest != null) { + // Delete previous request for ttl which is now stale + previousRequest.cancel(); + } + } + HerderRequest request = herder.restartConnector(ttl, connectorName, null); + connectorRequests.put(path, request); + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 3296007b364..47f8529e2d1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -40,6 +40,7 @@ import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.sink.SinkRecord; @@ -70,6 +71,7 @@ private final WorkerConfig workerConfig; private final SinkTask task; + private final ClusterConfigState configState; private Map<String, String> taskConfig; private final Time time; private final Converter keyConverter; @@ -96,6 +98,7 @@ public WorkerSinkTask(ConnectorTaskId id, TaskStatus.Listener statusListener, TargetState initialState, WorkerConfig workerConfig, + ClusterConfigState configState, ConnectMetrics connectMetrics, Converter keyConverter, Converter valueConverter, @@ -108,6 +111,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.workerConfig = workerConfig; this.task = task; + this.configState = configState; this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.headerConverter = headerConverter; @@ -133,7 +137,7 @@ public void initialize(TaskConfig taskConfig) { try { this.taskConfig = taskConfig.originalsStrings(); this.consumer = createConsumer(); - this.context = new WorkerSinkTaskContext(consumer, this); + this.context = new WorkerSinkTaskContext(consumer, this, configState); } catch (Throwable t) { log.error("{} Task failed initialization and will not be started.", this, t); onFailure(t); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index 386f992e82a..3a6b0d6d7b8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.IllegalWorkerStateException; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,17 +38,26 @@ private long timeoutMs; private KafkaConsumer<byte[], byte[]> consumer; private final WorkerSinkTask sinkTask; + private final ClusterConfigState configState; private final Set<TopicPartition> pausedPartitions; private boolean commitRequested; - public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, WorkerSinkTask sinkTask) { + public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, + WorkerSinkTask sinkTask, + ClusterConfigState configState) { this.offsets = new HashMap<>(); this.timeoutMs = -1L; this.consumer = consumer; this.sinkTask = sinkTask; + this.configState = configState; this.pausedPartitions = new HashSet<>(); } + @Override + public Map<String, String> configs() { + return configState.taskConfig(sinkTask.id()); + } + @Override public void offset(Map<TopicPartition, Long> offsets) { log.debug("{} Setting offsets for topic partitions {}", this, offsets); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index e7b92a4d403..70d0cf9d7ae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -34,6 +34,7 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.source.SourceRecord; @@ -66,6 +67,7 @@ private final WorkerConfig workerConfig; private final SourceTask task; + private final ClusterConfigState configState; private final Converter keyConverter; private final Converter valueConverter; private final HeaderConverter headerConverter; @@ -103,6 +105,7 @@ public WorkerSourceTask(ConnectorTaskId id, OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, WorkerConfig workerConfig, + ClusterConfigState configState, ConnectMetrics connectMetrics, ClassLoader loader, Time time, @@ -112,6 +115,7 @@ public WorkerSourceTask(ConnectorTaskId id, this.workerConfig = workerConfig; this.task = task; + this.configState = configState; this.keyConverter = keyConverter; this.valueConverter = valueConverter; this.headerConverter = headerConverter; @@ -190,7 +194,7 @@ private synchronized void tryStop() { @Override public void execute() { try { - task.initialize(new WorkerSourceTaskContext(offsetReader)); + task.initialize(new WorkerSourceTaskContext(offsetReader, this, configState)); task.start(taskConfig); log.info("{} Source task finished initialization and start", this); synchronized (this) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java index 8f60e57e005..fe1409b282a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java @@ -16,15 +16,29 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; +import java.util.Map; + public class WorkerSourceTaskContext implements SourceTaskContext { private final OffsetStorageReader reader; + private final WorkerSourceTask task; + private final ClusterConfigState configState; - public WorkerSourceTaskContext(OffsetStorageReader reader) { + public WorkerSourceTaskContext(OffsetStorageReader reader, + WorkerSourceTask task, + ClusterConfigState configState) { this.reader = reader; + this.task = task; + this.configState = configState; + } + + @Override + public Map<String, String> configs() { + return configState.taskConfig(task.id()); } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java index cac71ddc73b..9507706840f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.distributed; +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -24,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; @@ -46,6 +48,7 @@ private final Map<String, TargetState> connectorTargetStates; private final Map<ConnectorTaskId, Map<String, String>> taskConfigs; private final Set<String> inconsistentConnectors; + private final WorkerConfigTransformer configTransformer; public ClusterConfigState(long offset, Map<String, Integer> connectorTaskCounts, @@ -53,12 +56,29 @@ public ClusterConfigState(long offset, Map<String, TargetState> connectorTargetStates, Map<ConnectorTaskId, Map<String, String>> taskConfigs, Set<String> inconsistentConnectors) { + this(offset, + connectorTaskCounts, + connectorConfigs, + connectorTargetStates, + taskConfigs, + inconsistentConnectors, + null); + } + + public ClusterConfigState(long offset, + Map<String, Integer> connectorTaskCounts, + Map<String, Map<String, String>> connectorConfigs, + Map<String, TargetState> connectorTargetStates, + Map<ConnectorTaskId, Map<String, String>> taskConfigs, + Set<String> inconsistentConnectors, + WorkerConfigTransformer configTransformer) { this.offset = offset; this.connectorTaskCounts = connectorTaskCounts; this.connectorConfigs = connectorConfigs; this.connectorTargetStates = connectorTargetStates; this.taskConfigs = taskConfigs; this.inconsistentConnectors = inconsistentConnectors; + this.configTransformer = configTransformer; } /** @@ -87,12 +107,19 @@ public boolean contains(String connector) { } /** - * Get the configuration for a connector. + * Get the configuration for a connector. The configuration will have been transformed by + * {@link org.apache.kafka.common.config.ConfigTransformer} by having all variable + * references replaced with the current values from external instances of + * {@link org.apache.kafka.common.config.ConfigProvider}, and may include secrets. * @param connector name of the connector * @return a map containing configuration parameters */ public Map<String, String> connectorConfig(String connector) { - return connectorConfigs.get(connector); + Map<String, String> configs = connectorConfigs.get(connector); + if (configTransformer != null) { + configs = configTransformer.transform(connector, configs); + } + return configs; } /** @@ -105,12 +132,19 @@ public TargetState targetState(String connector) { } /** - * Get the configuration for a task. + * Get the configuration for a task. The configuration will have been transformed by + * {@link org.apache.kafka.common.config.ConfigTransformer} by having all variable + * references replaced with the current values from external instances of + * {@link org.apache.kafka.common.config.ConfigProvider}, and may include secrets. * @param task id of the task * @return a map containing configuration parameters */ public Map<String, String> taskConfig(ConnectorTaskId task) { - return taskConfigs.get(task); + Map<String, String> configs = taskConfigs.get(task); + if (configTransformer != null) { + configs = configTransformer.transform(task.connector(), configs); + } + return configs; } /** @@ -184,4 +218,30 @@ public String toString() { ", inconsistentConnectors=" + inconsistentConnectors + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterConfigState that = (ClusterConfigState) o; + return offset == that.offset && + Objects.equals(connectorTaskCounts, that.connectorTaskCounts) && + Objects.equals(connectorConfigs, that.connectorConfigs) && + Objects.equals(connectorTargetStates, that.connectorTargetStates) && + Objects.equals(taskConfigs, that.taskConfigs) && + Objects.equals(inconsistentConnectors, that.inconsistentConnectors) && + Objects.equals(configTransformer, that.configTransformer); + } + + @Override + public int hashCode() { + return Objects.hash( + offset, + connectorTaskCounts, + connectorConfigs, + connectorTargetStates, + taskConfigs, + inconsistentConnectors, + configTransformer); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 5e9707aa29e..5efb78a93e4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -38,6 +38,7 @@ import org.apache.kafka.connect.runtime.ConnectMetricsRegistry; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.HerderRequest; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; @@ -60,6 +61,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.NavigableSet; import java.util.NoSuchElementException; @@ -139,7 +141,7 @@ // To handle most external requests, like creating or destroying a connector, we can use a generic request where // the caller specifies all the code that should be executed. - final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>(); + final NavigableSet<DistributedHerderRequest> requests = new ConcurrentSkipListSet<>(); // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits). private Set<String> connectorConfigUpdates = new HashSet<>(); @@ -255,7 +257,7 @@ public void tick() { final long now = time.milliseconds(); long nextRequestTimeoutMs = Long.MAX_VALUE; while (true) { - final HerderRequest next = peekWithoutException(); + final DistributedHerderRequest next = peekWithoutException(); if (next == null) { break; } else if (now >= next.at) { @@ -382,7 +384,7 @@ public void halt() { // Explicitly fail any outstanding requests so they actually get a response and get an // understandable reason for their failure. - HerderRequest request = requests.pollFirst(); + DistributedHerderRequest request = requests.pollFirst(); while (request != null) { request.callback().onCompletion(new ConnectException("Worker is shutting down"), null); request = requests.pollFirst(); @@ -640,9 +642,21 @@ else if (!configState.contains(connName)) ); } + @Override + public ConfigReloadAction connectorConfigReloadAction(final String connName) { + return ConfigReloadAction.valueOf( + configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG) + .toUpperCase(Locale.ROOT)); + } + @Override public void restartConnector(final String connName, final Callback<Void> callback) { - addRequest(new Callable<Void>() { + restartConnector(0, connName, callback); + } + + @Override + public HerderRequest restartConnector(final long delayMs, final String connName, final Callback<Void> callback) { + return addRequest(delayMs, new Callable<Void>() { @Override public Void call() throws Exception { if (checkRebalanceNeeded(callback)) @@ -858,6 +872,7 @@ private boolean startTask(ConnectorTaskId taskId) { log.info("Starting task {}", taskId); return worker.startTask( taskId, + configState, configState.connectorConfig(taskId.connector()), configState.taskConfig(taskId), this, @@ -945,7 +960,7 @@ private void reconfigureConnectorTasksWithRetry(final String connName) { public void onCompletion(Throwable error, Void result) { // If we encountered an error, we don't have much choice but to just retry. If we don't, we could get // stuck with a connector that thinks it has generated tasks, but wasn't actually successful and therefore - // never makes progress. The retry has to run through a HerderRequest since this callback could be happening + // never makes progress. The retry has to run through a DistributedHerderRequest since this callback could be happening // from the HTTP request forwarding thread. if (error != null) { log.error("Failed to reconfigure connector's tasks, retrying after backoff:", error); @@ -1041,19 +1056,19 @@ private boolean checkRebalanceNeeded(Callback<?> callback) { return false; } - HerderRequest addRequest(Callable<Void> action, Callback<Void> callback) { + DistributedHerderRequest addRequest(Callable<Void> action, Callback<Void> callback) { return addRequest(0, action, callback); } - HerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) { - HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback); + DistributedHerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) { + DistributedHerderRequest req = new DistributedHerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback); requests.add(req); if (peekWithoutException() == req) member.wakeup(); return req; } - private HerderRequest peekWithoutException() { + private DistributedHerderRequest peekWithoutException() { try { return requests.isEmpty() ? null : requests.first(); } catch (NoSuchElementException e) { @@ -1117,13 +1132,13 @@ public void onConnectorTargetStateChange(String connector) { } } - static class HerderRequest implements Comparable<HerderRequest> { + class DistributedHerderRequest implements HerderRequest, Comparable<DistributedHerderRequest> { private final long at; private final long seq; private final Callable<Void> action; private final Callback<Void> callback; - public HerderRequest(long at, long seq, Callable<Void> action, Callback<Void> callback) { + public DistributedHerderRequest(long at, long seq, Callable<Void> action, Callback<Void> callback) { this.at = at; this.seq = seq; this.action = action; @@ -1139,7 +1154,12 @@ public HerderRequest(long at, long seq, Callable<Void> action, Callback<Void> ca } @Override - public int compareTo(HerderRequest o) { + public void cancel() { + DistributedHerder.this.requests.remove(this); + } + + @Override + public int compareTo(DistributedHerderRequest o) { final int cmp = Long.compare(at, o.at); return cmp == 0 ? Long.compare(seq, o.seq) : cmp; } @@ -1147,9 +1167,9 @@ public int compareTo(HerderRequest o) { @Override public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof HerderRequest)) + if (!(o instanceof DistributedHerderRequest)) return false; - HerderRequest other = (HerderRequest) o; + DistributedHerderRequest other = (DistributedHerderRequest) o; return compareTo(other) == 0; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index b56bd1a7d91..1e598517507 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.common.config.ConfigProvider; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.rest.ConnectRestExtension; @@ -66,6 +67,7 @@ private final SortedSet<PluginDesc<Converter>> converters; private final SortedSet<PluginDesc<HeaderConverter>> headerConverters; private final SortedSet<PluginDesc<Transformation>> transformations; + private final SortedSet<PluginDesc<ConfigProvider>> configProviders; private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions; private final List<String> pluginPaths; private final Map<Path, PluginClassLoader> activePaths; @@ -80,6 +82,7 @@ public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) { this.converters = new TreeSet<>(); this.headerConverters = new TreeSet<>(); this.transformations = new TreeSet<>(); + this.configProviders = new TreeSet<>(); this.restExtensions = new TreeSet<>(); } @@ -103,6 +106,10 @@ public DelegatingClassLoader(List<String> pluginPaths) { return transformations; } + public Set<PluginDesc<ConfigProvider>> configProviders() { + return configProviders; + } + public Set<PluginDesc<ConnectRestExtension>> restExtensions() { return restExtensions; } @@ -236,6 +243,8 @@ private void scanUrlsAndAddPlugins( headerConverters.addAll(plugins.headerConverters()); addPlugins(plugins.transformations(), loader); transformations.addAll(plugins.transformations()); + addPlugins(plugins.configProviders(), loader); + configProviders.addAll(plugins.configProviders()); addPlugins(plugins.restExtensions(), loader); restExtensions.addAll(plugins.restExtensions()); } @@ -292,6 +301,7 @@ private PluginScanResult scanPluginPath( getPluginDesc(reflections, Converter.class, loader), getPluginDesc(reflections, HeaderConverter.class, loader), getPluginDesc(reflections, Transformation.class, loader), + getPluginDesc(reflections, ConfigProvider.class, loader), getServiceLoaderPluginDesc(ConnectRestExtension.class, loader) ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java index 6f48e5694bd..87b0b70c503 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.common.config.ConfigProvider; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.storage.Converter; @@ -29,6 +30,7 @@ private final Collection<PluginDesc<Converter>> converters; private final Collection<PluginDesc<HeaderConverter>> headerConverters; private final Collection<PluginDesc<Transformation>> transformations; + private final Collection<PluginDesc<ConfigProvider>> configProviders; private final Collection<PluginDesc<ConnectRestExtension>> restExtensions; public PluginScanResult( @@ -36,12 +38,14 @@ public PluginScanResult( Collection<PluginDesc<Converter>> converters, Collection<PluginDesc<HeaderConverter>> headerConverters, Collection<PluginDesc<Transformation>> transformations, + Collection<PluginDesc<ConfigProvider>> configProviders, Collection<PluginDesc<ConnectRestExtension>> restExtensions ) { this.connectors = connectors; this.converters = converters; this.headerConverters = headerConverters; this.transformations = transformations; + this.configProviders = configProviders; this.restExtensions = restExtensions; } @@ -61,6 +65,10 @@ public PluginScanResult( return transformations; } + public Collection<PluginDesc<ConfigProvider>> configProviders() { + return configProviders; + } + public Collection<PluginDesc<ConnectRestExtension>> restExtensions() { return restExtensions; } @@ -70,6 +78,7 @@ public boolean isEmpty() { && converters().isEmpty() && headerConverters().isEmpty() && transformations().isEmpty() + && configProviders().isEmpty() && restExtensions().isEmpty(); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java index 918f9d73f56..906b85f7003 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.common.config.ConfigProvider; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.sink.SinkConnector; @@ -31,6 +32,7 @@ CONNECTOR(Connector.class), CONVERTER(Converter.class), TRANSFORMATION(Transformation.class), + CONFIGPROVIDER(ConfigProvider.class), REST_EXTENSION(ConnectRestExtension.class), UNKNOWN(Object.class); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 96074106de0..c89accd3805 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigProvider; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; @@ -147,6 +148,10 @@ public DelegatingClassLoader delegatingLoader() { return delegatingLoader.transformations(); } + public Set<PluginDesc<ConfigProvider>> configProviders() { + return delegatingLoader.configProviders(); + } + public Connector newConnector(String connectorClassOrAlias) { Class<? extends Connector> klass; try { @@ -318,6 +323,45 @@ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPro return plugin; } + public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) { + String classPropertyName = providerPrefix + ".class"; + Map<String, String> originalConfig = config.originalsStrings(); + if (!originalConfig.containsKey(classPropertyName)) { + // This configuration does not define the config provider via the specified property name + return null; + } + ConfigProvider plugin = null; + switch (classLoaderUsage) { + case CURRENT_CLASSLOADER: + // Attempt to load first with the current classloader, and plugins as a fallback. + plugin = getInstance(config, classPropertyName, ConfigProvider.class); + break; + case PLUGINS: + // Attempt to load with the plugin class loader, which uses the current classloader as a fallback + String configProviderClassOrAlias = originalConfig.get(classPropertyName); + Class<? extends ConfigProvider> klass; + try { + klass = pluginClass(delegatingLoader, configProviderClassOrAlias, ConfigProvider.class); + } catch (ClassNotFoundException e) { + throw new ConnectException( + "Failed to find any class that implements ConfigProvider and which name matches " + + configProviderClassOrAlias + ", available ConfigProviders are: " + + pluginNames(delegatingLoader.configProviders()) + ); + } + plugin = newPlugin(klass); + break; + } + if (plugin == null) { + throw new ConnectException("Unable to instantiate the ConfigProvider specified in '" + classPropertyName + "'"); + } + + // Configure the ConfigProvider + String configPrefix = providerPrefix + ".param."; + Map<String, Object> configProviderConfig = config.originalsWithPrefix(configPrefix); + plugin.configure(configProviderConfig); + return plugin; + } /** * If the given class names are available in the classloader, return a list of new configured diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 96f8e8767bb..20c6a24d384 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.HerderRequest; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; @@ -41,7 +42,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** @@ -50,10 +58,17 @@ public class StandaloneHerder extends AbstractHerder { private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class); + private final AtomicLong requestSeqNum = new AtomicLong(); + private final ScheduledExecutorService requestExecutorService; + private ClusterConfigState configState; public StandaloneHerder(Worker worker, String kafkaClusterId) { - this(worker, worker.workerId(), kafkaClusterId, new MemoryStatusBackingStore(), new MemoryConfigBackingStore()); + this(worker, + worker.workerId(), + kafkaClusterId, + new MemoryStatusBackingStore(), + new MemoryConfigBackingStore(worker.configTransformer())); } // visible for testing @@ -64,6 +79,7 @@ public StandaloneHerder(Worker worker, String kafkaClusterId) { MemoryConfigBackingStore configBackingStore) { super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore); this.configState = ClusterConfigState.EMPTY; + this.requestExecutorService = Executors.newSingleThreadScheduledExecutor(); configBackingStore.setUpdateListener(new ConfigUpdateListener()); } @@ -77,6 +93,13 @@ public synchronized void start() { @Override public synchronized void stop() { log.info("Herder stopping"); + requestExecutorService.shutdown(); + try { + if (!requestExecutorService.awaitTermination(30, TimeUnit.SECONDS)) + requestExecutorService.shutdownNow(); + } catch (InterruptedException e) { + // ignore + } // There's no coordination/hand-off to do here since this is all standalone. Instead, we // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all @@ -229,12 +252,19 @@ public synchronized void restartTask(ConnectorTaskId taskId, Callback<Void> cb) TargetState targetState = configState.targetState(taskId.connector()); worker.stopAndAwaitTask(taskId); - if (worker.startTask(taskId, connConfigProps, taskConfigProps, this, targetState)) + if (worker.startTask(taskId, configState, connConfigProps, taskConfigProps, this, targetState)) cb.onCompletion(null, null); else cb.onCompletion(new ConnectException("Failed to start task: " + taskId), null); } + @Override + public ConfigReloadAction connectorConfigReloadAction(final String connName) { + return ConfigReloadAction.valueOf( + configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG) + .toUpperCase(Locale.ROOT)); + } + @Override public synchronized void restartConnector(String connName, Callback<Void> cb) { if (!configState.contains(connName)) @@ -248,11 +278,24 @@ public synchronized void restartConnector(String connName, Callback<Void> cb) { cb.onCompletion(new ConnectException("Failed to start connector: " + connName), null); } + @Override + public synchronized HerderRequest restartConnector(long delayMs, final String connName, final Callback<Void> cb) { + ScheduledFuture<?> future = requestExecutorService.schedule(new Runnable() { + @Override + public void run() { + restartConnector(connName, cb); + } + }, delayMs, TimeUnit.MILLISECONDS); + + return new StandaloneHerderRequest(requestSeqNum.incrementAndGet(), future); + } + private boolean startConnector(Map<String, String> connectorProps) { String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG); configBackingStore.putConnectorConfig(connName, connectorProps); + Map<String, String> connConfigs = configState.connectorConfig(connName); TargetState targetState = configState.targetState(connName); - return worker.startConnector(connName, connectorProps, new HerderConnectorContext(this, connName), this, targetState); + return worker.startConnector(connName, connConfigs, new HerderConnectorContext(this, connName), this, targetState); } private List<Map<String, String>> recomputeTaskConfigs(String connName) { @@ -270,7 +313,7 @@ private void createConnectorTasks(String connName, TargetState initialState) { for (ConnectorTaskId taskId : configState.tasks(connName)) { Map<String, String> taskConfigMap = configState.taskConfig(taskId); - worker.startTask(taskId, connConfigs, taskConfigMap, this, initialState); + worker.startTask(taskId, configState, connConfigs, taskConfigMap, this, initialState); } } @@ -342,4 +385,32 @@ public void onConnectorTargetStateChange(String connector) { } } + static class StandaloneHerderRequest implements HerderRequest { + private final long seq; + private final ScheduledFuture<?> future; + + public StandaloneHerderRequest(long seq, ScheduledFuture<?> future) { + this.seq = seq; + this.future = future; + } + + @Override + public void cancel() { + future.cancel(false); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof StandaloneHerderRequest)) + return false; + StandaloneHerderRequest other = (StandaloneHerderRequest) o; + return seq == other.seq; + } + + @Override + public int hashCode() { + return Objects.hash(seq); + } + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e51b365cec6..ea196650c5e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; @@ -221,7 +222,9 @@ public static String COMMIT_TASKS_KEY(String connectorName) { private final Map<String, TargetState> connectorTargetStates = new HashMap<>(); - public KafkaConfigBackingStore(Converter converter, WorkerConfig config) { + private final WorkerConfigTransformer configTransformer; + + public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) { this.lock = new Object(); this.started = false; this.converter = converter; @@ -232,6 +235,7 @@ public KafkaConfigBackingStore(Converter converter, WorkerConfig config) { throw new ConfigException("Must specify topic for connector configuration."); configLog = setupAndCreateKafkaBasedLog(this.topic, config); + this.configTransformer = configTransformer; } @Override @@ -270,7 +274,8 @@ public ClusterConfigState snapshot() { new HashMap<>(connectorConfigs), new HashMap<>(connectorTargetStates), new HashMap<>(taskConfigs), - new HashSet<>(inconsistent) + new HashSet<>(inconsistent), + configTransformer ); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java index 25891f52354..7e7d62ba3c9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.storage; import org.apache.kafka.connect.runtime.TargetState; +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -32,6 +33,14 @@ private Map<String, ConnectorState> connectors = new HashMap<>(); private UpdateListener updateListener; + private WorkerConfigTransformer configTransformer; + + public MemoryConfigBackingStore() { + } + + public MemoryConfigBackingStore(WorkerConfigTransformer configTransformer) { + this.configTransformer = configTransformer; + } @Override public synchronized void start() { @@ -63,7 +72,8 @@ public synchronized ClusterConfigState snapshot() { connectorConfigs, connectorTargetStates, taskConfigs, - Collections.<String>emptySet()); + Collections.<String>emptySet(), + configTransformer); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 0718eb17653..da017e851b7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -172,14 +172,14 @@ public void testConfigValidationMissingName() { assertEquals(TestSourceConnector.class.getName(), result.name()); assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups()); assertEquals(2, result.errorCount()); - // Base connector config has 7 fields, connector's configs add 2 - assertEquals(9, result.values().size()); + // Base connector config has 8 fields, connector's configs add 2 + assertEquals(10, result.values().size()); // Missing name should generate an error assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name()); assertEquals(1, result.values().get(0).configValue().errors().size()); // "required" config from connector should generate an error - assertEquals("required", result.values().get(7).configValue().name()); - assertEquals(1, result.values().get(7).configValue().errors().size()); + assertEquals("required", result.values().get(8).configValue().name()); + assertEquals(1, result.values().get(8).configValue().errors().size()); verifyAll(); } @@ -233,15 +233,15 @@ public void testConfigValidationTransformsExtendResults() { ); assertEquals(expectedGroups, result.groups()); assertEquals(2, result.errorCount()); - // Base connector config has 7 fields, connector's configs add 2, 2 type fields from the transforms, and + // Base connector config has 8 fields, connector's configs add 2, 2 type fields from the transforms, and // 1 from the valid transformation's config - assertEquals(12, result.values().size()); + assertEquals(13, result.values().size()); // Should get 2 type fields from the transforms, first adds its own config since it has a valid class - assertEquals("transforms.xformA.type", result.values().get(7).configValue().name()); - assertTrue(result.values().get(7).configValue().errors().isEmpty()); - assertEquals("transforms.xformA.subconfig", result.values().get(8).configValue().name()); - assertEquals("transforms.xformB.type", result.values().get(9).configValue().name()); - assertFalse(result.values().get(9).configValue().errors().isEmpty()); + assertEquals("transforms.xformA.type", result.values().get(8).configValue().name()); + assertTrue(result.values().get(8).configValue().errors().isEmpty()); + assertEquals("transforms.xformA.subconfig", result.values().get(9).configValue().name()); + assertEquals("transforms.xformB.type", result.values().get(10).configValue().name()); + assertFalse(result.values().get(10).configValue().errors().isEmpty()); verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 5a8bcc5a6cc..b50e7ff0956 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.LogReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; @@ -369,7 +370,8 @@ private void createSinkTask(TargetState initialState, RetryWithToleranceOperator workerSinkTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, metrics, converter, converter, + taskId, sinkTask, statusListener, initialState, workerConfig, + ClusterConfigState.EMPTY, metrics, converter, converter, headerConverter, sinkTransforms, pluginLoader, time, retryWithToleranceOperator); } @@ -398,7 +400,8 @@ private void createSourceTask(TargetState initialState, RetryWithToleranceOperat workerSourceTask = PowerMock.createPartialMock( WorkerSourceTask.class, new String[]{"commitOffsets", "isStopping"}, taskId, sourceTask, statusListener, initialState, converter, converter, headerConverter, sourceTransforms, - producer, offsetReader, offsetWriter, workerConfig, metrics, pluginLoader, time, retryWithToleranceOperator); + producer, offsetReader, offsetWriter, workerConfig, + ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator); } private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java new file mode 100644 index 00000000000..89bba09b0da --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigChangeCallback; +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigProvider; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.powermock.api.easymock.PowerMock.replayAll; + +@RunWith(PowerMockRunner.class) +public class WorkerConfigTransformerTest { + + public static final String MY_KEY = "myKey"; + public static final String MY_CONNECTOR = "myConnector"; + public static final String TEST_KEY = "testKey"; + public static final String TEST_PATH = "testPath"; + public static final String TEST_KEY_WITH_TTL = "testKeyWithTTL"; + public static final String TEST_KEY_WITH_LONGER_TTL = "testKeyWithLongerTTL"; + public static final String TEST_RESULT = "testResult"; + public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL"; + public static final String TEST_RESULT_WITH_LONGER_TTL = "testResultWithLongerTTL"; + + @Mock private Herder herder; + @Mock private Worker worker; + @Mock private HerderRequest requestId; + private WorkerConfigTransformer configTransformer; + + @Before + public void setup() { + worker = PowerMock.createMock(Worker.class); + herder = PowerMock.createMock(Herder.class); + configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider())); + } + + @Test + public void testReplaceVariable() throws Exception { + Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}")); + assertEquals(TEST_RESULT, result.get(MY_KEY)); + } + + @Test + public void testReplaceVariableWithTTL() throws Exception { + EasyMock.expect(worker.herder()).andReturn(herder); + EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.NONE); + + replayAll(); + + Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); + } + + @Test + public void testReplaceVariableWithTTLAndScheduleRestart() throws Exception { + EasyMock.expect(worker.herder()).andReturn(herder); + EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART); + EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId); + + replayAll(); + + Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); + } + + @Test + public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() throws Exception { + EasyMock.expect(worker.herder()).andReturn(herder); + EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART); + EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, null)).andReturn(requestId); + + EasyMock.expect(worker.herder()).andReturn(herder); + EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART); + EasyMock.expectLastCall(); + requestId.cancel(); + EasyMock.expectLastCall(); + EasyMock.expect(herder.restartConnector(10L, MY_CONNECTOR, null)).andReturn(requestId); + + replayAll(); + + Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); + + result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithLongerTTL}")); + assertEquals(TEST_RESULT_WITH_LONGER_TTL, result.get(MY_KEY)); + } + + public static class TestConfigProvider implements ConfigProvider { + + public void configure(Map<String, ?> configs) { + } + + public ConfigData get(String path) { + return null; + } + + public ConfigData get(String path, Set<String> keys) { + if (path.equals(TEST_PATH)) { + if (keys.contains(TEST_KEY)) { + return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT)); + } else if (keys.contains(TEST_KEY_WITH_TTL)) { + return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); + } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) { + return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); + } + } + return new ConfigData(Collections.emptyMap()); + } + + public void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) { + throw new UnsupportedOperationException(); + } + + public void unsubscribe(String path, Set<String> keys) { + throw new UnsupportedOperationException(); + } + + public void close() { + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index ff8507c0945..d23adbf3d69 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; @@ -162,7 +163,7 @@ public void setUp() { private void createTask(TargetState initialState) { workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, metrics, + taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, transformationChain, pluginLoader, time, RetryWithToleranceOperator.NOOP_OPERATOR); @@ -1463,5 +1464,4 @@ private void assertMetrics(int minimumPollCountExpected) { private abstract static class TestSinkTask extends SinkTask { } - } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 61d8778d11f..800301e05dc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -137,7 +138,7 @@ public void setup() { workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, + taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverter, valueConverter, headerConverter, new TransformationChain(Collections.emptyList(), RetryWithToleranceOperator.NOOP_OPERATOR), pluginLoader, time, RetryWithToleranceOperator.NOOP_OPERATOR); @@ -700,5 +701,4 @@ public Object answer() throws Throwable { private static abstract class TestSinkTask extends SinkTask { } - } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 77f4ad93bad..1482d75b513 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -104,6 +105,7 @@ @Mock private KafkaProducer<byte[], byte[]> producer; @Mock private OffsetStorageReader offsetReader; @Mock private OffsetStorageWriter offsetWriter; + @Mock private ClusterConfigState clusterConfigState; private WorkerSourceTask workerTask; @Mock private Future<RecordMetadata> sendFuture; @MockStrict private TaskStatus.Listener statusListener; @@ -148,7 +150,7 @@ private void createWorkerTask() { private void createWorkerTask(TargetState initialState) { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, - transformationChain, producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM, + transformationChain, producer, offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, RetryWithToleranceOperator.NOOP_OPERATOR); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index d29eef5ed69..6fa7ed11b9e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; @@ -491,6 +492,7 @@ public void testAddRemoveTask() throws Exception { anyObject(OffsetStorageReader.class), anyObject(OffsetStorageWriter.class), EasyMock.eq(config), + anyObject(ClusterConfigState.class), anyObject(ConnectMetrics.class), anyObject(ClassLoader.class), anyObject(Time.class), @@ -547,7 +549,7 @@ public void testAddRemoveTask() throws Exception { assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds()); - worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); + worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); assertStatistics(worker, 0, 1); assertStartupStatistics(worker, 0, 0, 1, 0); assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds()); @@ -598,7 +600,7 @@ public void testStartTaskFailure() throws Exception { assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0); - assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED)); + assertFalse(worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED)); assertStartupStatistics(worker, 0, 0, 1, 1); assertStatistics(worker, 0, 0); @@ -629,6 +631,7 @@ public void testCleanupTasksOnStop() throws Exception { anyObject(OffsetStorageReader.class), anyObject(OffsetStorageWriter.class), anyObject(WorkerConfig.class), + anyObject(ClusterConfigState.class), anyObject(ConnectMetrics.class), EasyMock.eq(pluginLoader), anyObject(Time.class), @@ -688,7 +691,7 @@ public void testCleanupTasksOnStop() throws Exception { worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); worker.start(); assertStatistics(worker, 0, 0); - worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); + worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); assertStatistics(worker, 0, 1); worker.stop(); assertStatistics(worker, 0, 0); @@ -721,6 +724,7 @@ public void testConverterOverrides() throws Exception { anyObject(OffsetStorageReader.class), anyObject(OffsetStorageWriter.class), anyObject(WorkerConfig.class), + anyObject(ClusterConfigState.class), anyObject(ConnectMetrics.class), EasyMock.eq(pluginLoader), anyObject(Time.class), @@ -784,7 +788,7 @@ public void testConverterOverrides() throws Exception { connProps.put("key.converter.extra.config", "foo"); connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConfigurableConverter.class.getName()); connProps.put("value.converter.extra.config", "bar"); - worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, TargetState.STARTED); + worker.startTask(TASK_ID, ClusterConfigState.EMPTY, connProps, origProps, taskStatusListener, TargetState.STARTED); assertStatistics(worker, 0, 1); assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds()); worker.stopAndAwaitTask(TASK_ID); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index d7a7d87cb4f..911afe7ec2f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -214,7 +214,7 @@ public void testJoinAssignment() throws Exception { EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK1), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); @@ -241,7 +241,7 @@ public void testRebalance() throws Exception { PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK1), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); @@ -288,7 +288,7 @@ public void testRebalanceFailedConnector() throws Exception { PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK1), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); @@ -756,7 +756,7 @@ public void testRestartTask() throws Exception { expectPostRebalanceCatchup(SNAPSHOT); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); @@ -770,7 +770,7 @@ public void testRestartTask() throws Exception { worker.stopAndAwaitTask(TASK0); PowerMock.expectLastCall(); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); @@ -820,10 +820,10 @@ public void testRestartUnknownTask() throws Exception { @Test public void testRequestProcessingOrder() throws Exception { - final DistributedHerder.HerderRequest req1 = herder.addRequest(100, null, null); - final DistributedHerder.HerderRequest req2 = herder.addRequest(10, null, null); - final DistributedHerder.HerderRequest req3 = herder.addRequest(200, null, null); - final DistributedHerder.HerderRequest req4 = herder.addRequest(200, null, null); + final DistributedHerder.DistributedHerderRequest req1 = herder.addRequest(100, null, null); + final DistributedHerder.DistributedHerderRequest req2 = herder.addRequest(10, null, null); + final DistributedHerder.DistributedHerderRequest req3 = herder.addRequest(200, null, null); + final DistributedHerder.DistributedHerderRequest req4 = herder.addRequest(200, null, null); assertEquals(req2, herder.requests.pollFirst()); // lowest delay assertEquals(req1, herder.requests.pollFirst()); // next lowest delay @@ -1080,7 +1080,7 @@ public void testUnknownConnectorPaused() throws Exception { // join expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); @@ -1117,7 +1117,7 @@ public void testConnectorPausedRunningTaskOnly() throws Exception { // join expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); @@ -1157,7 +1157,7 @@ public void testConnectorResumedRunningTaskOnly() throws Exception { // join expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); @@ -1210,7 +1210,7 @@ public void testTaskConfigAdded() { expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0)); - worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK0), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); member.poll(EasyMock.anyInt()); @@ -1250,7 +1250,7 @@ public void testJoinLeaderCatchUpFails() throws Exception { PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), + worker.startTask(EasyMock.eq(TASK1), EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index fd330f28009..5372a3a27a5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConnector; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -68,6 +69,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -325,6 +327,7 @@ public void testRestartConnectorFailureOnStart() throws Exception { PowerMock.verifyAll(); } + @Test public void testRestartTask() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); @@ -337,7 +340,14 @@ public void testRestartTask() throws Exception { worker.stopAndAwaitTask(taskId); EasyMock.expectLastCall(); - worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); + ClusterConfigState configState = new ClusterConfigState( + -1, + Collections.singletonMap(CONNECTOR_NAME, 1), + Collections.singletonMap(CONNECTOR_NAME, connectorConfig), + Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), + Collections.singletonMap(taskId, taskConfig(SourceSink.SOURCE)), + new HashSet<>()); + worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(true); PowerMock.replayAll(); @@ -363,7 +373,14 @@ public void testRestartTaskFailureOnStart() throws Exception { worker.stopAndAwaitTask(taskId); EasyMock.expectLastCall(); - worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); + ClusterConfigState configState = new ClusterConfigState( + -1, + Collections.singletonMap(CONNECTOR_NAME, 1), + Collections.singletonMap(CONNECTOR_NAME, connectorConfig), + Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), + Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)), + new HashSet<>()); + worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(false); PowerMock.replayAll(); @@ -597,7 +614,14 @@ private void expectAdd(SourceSink sourceSink) throws Exception { EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig)) .andReturn(singletonList(generatedTaskProps)); - worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); + ClusterConfigState configState = new ClusterConfigState( + -1, + Collections.singletonMap(CONNECTOR_NAME, 1), + Collections.singletonMap(CONNECTOR_NAME, connectorConfig(sourceSink)), + Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), + Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps), + new HashSet<>()); + worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(true); EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName())) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index aac1b78c918..ed62d9b6f01 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -146,7 +146,7 @@ @Before public void setUp() { - configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG); + configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG, null); Whitebox.setInternalState(configStorage, "configLog", storeLog); configStorage.setUpdateListener(configUpdateListener); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Externalize Secrets for Kafka Connect Configurations > ---------------------------------------------------- > > Key: KAFKA-6886 > URL: https://issues.apache.org/jira/browse/KAFKA-6886 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Robert Yokota > Assignee: Robert Yokota > Priority: Major > Fix For: 2.0.0 > > > Kafka Connect's connector configurations have plaintext passwords, and > Connect stores these in cleartext either on the filesystem (for standalone > mode) or in internal topics (for distributed mode). > Connect should not store or transmit cleartext passwords in connector > configurations. Secrets in stored connector configurations should be allowed > to be replaced with references to values stored in external secret management > systems. Connect should provide an extension point for adding customized > integrations, as well as provide a file-based extension as an example. > Second, a Connect runtime should be allowed to be configured to use one or > more of these extensions, and allow connector configurations to use > placeholders that will be resolved by the runtime before passing the complete > connector configurations to connectors. This will allow existing connectors > to not see any difference in the configurations that Connect provides to them > at startup. And third, Connect's API should be changed to allow a connector > to obtain the latest connector configuration at any time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)