[ 
https://issues.apache.org/jira/browse/KAFKA-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495724#comment-16495724
 ] 

ASF GitHub Bot commented on KAFKA-6886:
---------------------------------------

ewencp closed pull request #5068: KAFKA-6886 Externalize secrets from Connect 
configs
URL: https://github.com/apache/kafka/pull/5068
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ba48c38cb28..5bf69b6b65f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -83,7 +83,7 @@
               
files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
 
     <suppress checks="ParameterNumber"
-              files="WorkerSourceTask.java"/>
+              files="(WorkerSinkTask|WorkerSourceTask).java"/>
     <suppress checks="ParameterNumber"
               files="WorkerCoordinator.java"/>
     <suppress checks="ParameterNumber"
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java
 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java
new file mode 100644
index 00000000000..d4c9948bc93
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigChangeCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+/**
+ * A callback passed to {@link ConfigProvider} for subscribing to changes.
+ */
+public interface ConfigChangeCallback {
+
+    /**
+     * Performs an action when configuration data changes.
+     *
+     * @param path the path at which the data resides
+     * @param data the configuration data
+     */
+    void onChange(String path, ConfigData data);
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java
new file mode 100644
index 00000000000..2bd0ff6b06a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigData.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.Map;
+
+/**
+ * Configuration data from a {@link ConfigProvider}.
+ */
+public class ConfigData {
+
+    private final Map<String, String> data;
+    private final Long ttl;
+
+    /**
+     * Creates a new ConfigData with the given data and TTL (in milliseconds).
+     *
+     * @param data a Map of key-value pairs
+     * @param ttl the time-to-live of the data in milliseconds, or null if 
there is no TTL
+     */
+    public ConfigData(Map<String, String> data, Long ttl) {
+        this.data = data;
+        this.ttl = ttl;
+    }
+
+    /**
+     * Creates a new ConfigData with the given data.
+     *
+     * @param data a Map of key-value pairs
+     */
+    public ConfigData(Map<String, String> data) {
+        this(data, null);
+    }
+
+    /**
+     * Returns the data.
+     *
+     * @return data a Map of key-value pairs
+     */
+    public Map<String, String> data() {
+        return data;
+    }
+
+    /**
+     * Returns the TTL (in milliseconds).
+     *
+     * @return ttl the time-to-live (in milliseconds) of the data, or null if 
there is no TTL
+     */
+    public Long ttl() {
+        return ttl;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java
new file mode 100644
index 00000000000..7133baaebd0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import org.apache.kafka.common.Configurable;
+
+import java.io.Closeable;
+import java.util.Set;
+
+/**
+ * A provider of configuration data, which may optionally support 
subscriptions to configuration changes.
+ */
+public interface ConfigProvider extends Configurable, Closeable {
+
+    /**
+     * Retrieves the data at the given path.
+     *
+     * @param path the path where the data resides
+     * @return the configuration data
+     */
+    ConfigData get(String path);
+
+    /**
+     * Retrieves the data with the given keys at the given path.
+     *
+     * @param path the path where the data resides
+     * @param keys the keys whose values will be retrieved
+     * @return the configuration data
+     */
+    ConfigData get(String path, Set<String> keys);
+
+    /**
+     * Subscribes to changes for the given keys at the given path (optional 
operation).
+     *
+     * @param path the path where the data resides
+     * @param keys the keys whose values will be retrieved
+     * @param callback the callback to invoke upon change
+     * @throws {@link UnsupportedOperationException} if the subscribe 
operation is not supported
+     */
+    default void subscribe(String path, Set<String> keys, ConfigChangeCallback 
callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Unsubscribes to changes for the given keys at the given path (optional 
operation).
+     *
+     * @param path the path where the data resides
+     * @param keys the keys whose values will be retrieved
+     * @param callback the callback to be unsubscribed from changes
+     * @throws {@link UnsupportedOperationException} if the unsubscribe 
operation is not supported
+     */
+    default void unsubscribe(String path, Set<String> keys, 
ConfigChangeCallback callback) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Clears all subscribers (optional operation).
+     *
+     * @throws {@link UnsupportedOperationException} if the unsubscribeAll 
operation is not supported
+     */
+    default void unsubscribeAll() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
new file mode 100644
index 00000000000..7c3c516b073
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class wraps a set of {@link ConfigProvider} instances and uses them to 
perform
+ * transformations.
+ *
+ * <p>The default variable pattern is of the form 
<code>${provider:[path:]key}</code>,
+ * where the <code>provider</code> corresponds to a {@link ConfigProvider} 
instance, as passed to
+ * {@link ConfigTransformer#ConfigTransformer(Map)}.  The pattern will extract 
a set
+ * of paths (which are optional) and keys and then pass them to {@link 
ConfigProvider#get(String, Set)} to obtain the
+ * values with which to replace the variables.
+ *
+ * <p>For example, if a Map consisting of an entry with a provider name "file" 
and provider instance
+ * {@link FileConfigProvider} is passed to the {@link 
ConfigTransformer#ConfigTransformer(Map)}, and a Properties
+ * file with contents
+ * <pre>
+ * fileKey=someValue
+ * </pre>
+ * resides at the path "/tmp/properties.txt", then when a configuration Map 
which has an entry with a key "someKey" and
+ * a value "${file:/tmp/properties.txt:fileKey}" is passed to the {@link 
#transform(Map)} method, then the transformed
+ * Map will have an entry with key "someKey" and a value "someValue".
+ *
+ * <p>This class only depends on {@link ConfigProvider#get(String, Set)} and 
does not depend on subscription support
+ * in a {@link ConfigProvider}, such as the {@link 
ConfigProvider#subscribe(String, Set, ConfigChangeCallback)} and
+ * {@link ConfigProvider#unsubscribe(String, Set, ConfigChangeCallback)} 
methods.
+ */
+public class ConfigTransformer {
+    private static final Pattern DEFAULT_PATTERN = 
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
+    private static final String EMPTY_PATH = "";
+
+    private final Map<String, ConfigProvider> configProviders;
+
+    /**
+     * Creates a ConfigTransformer with the default pattern, of the form 
<code>${provider:[path:]key}</code>.
+     *
+     * @param configProviders a Map of provider names and {@link 
ConfigProvider} instances.
+     */
+    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
+        this.configProviders = configProviders;
+    }
+
+    /**
+     * Transforms the given configuration data by using the {@link 
ConfigProvider} instances to
+     * look up values to replace the variables in the pattern.
+     *
+     * @param configs the configuration values to be transformed
+     * @return an instance of {@link ConfigTransformerResult}
+     */
+    public ConfigTransformerResult transform(Map<String, String> configs) {
+        Map<String, Map<String, Set<String>>> keysByProvider = new HashMap<>();
+        Map<String, Map<String, Map<String, String>>> lookupsByProvider = new 
HashMap<>();
+
+        // Collect the variables from the given configs that need 
transformation
+        for (Map.Entry<String, String> config : configs.entrySet()) {
+            List<ConfigVariable> vars = getVars(config.getKey(), 
config.getValue(), DEFAULT_PATTERN);
+            for (ConfigVariable var : vars) {
+                Map<String, Set<String>> keysByPath = 
keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>());
+                Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> 
new HashSet<>());
+                keys.add(var.variable);
+            }
+        }
+
+        // Retrieve requested variables from the ConfigProviders
+        Map<String, Long> ttls = new HashMap<>();
+        for (Map.Entry<String, Map<String, Set<String>>> entry : 
keysByProvider.entrySet()) {
+            String providerName = entry.getKey();
+            ConfigProvider provider = configProviders.get(providerName);
+            Map<String, Set<String>> keysByPath = entry.getValue();
+            if (provider != null && keysByPath != null) {
+                for (Map.Entry<String, Set<String>> pathWithKeys : 
keysByPath.entrySet()) {
+                    String path = pathWithKeys.getKey();
+                    Set<String> keys = new HashSet<>(pathWithKeys.getValue());
+                    ConfigData configData = provider.get(path, keys);
+                    Map<String, String> data = configData.data();
+                    Long ttl = configData.ttl();
+                    if (ttl != null && ttl >= 0) {
+                        ttls.put(path, ttl);
+                    }
+                    Map<String, Map<String, String>> keyValuesByPath =
+                            lookupsByProvider.computeIfAbsent(providerName, k 
-> new HashMap<>());
+                    keyValuesByPath.put(path, data);
+                }
+            }
+        }
+
+        // Perform the transformations by performing variable replacements
+        Map<String, String> data = new HashMap<>(configs);
+        for (Map.Entry<String, String> config : configs.entrySet()) {
+            data.put(config.getKey(), replace(lookupsByProvider, 
config.getValue(), DEFAULT_PATTERN));
+        }
+        return new ConfigTransformerResult(data, ttls);
+    }
+
+    private static List<ConfigVariable> getVars(String key, String value, 
Pattern pattern) {
+        List<ConfigVariable> configVars = new ArrayList<>();
+        Matcher matcher = pattern.matcher(value);
+        while (matcher.find()) {
+            configVars.add(new ConfigVariable(matcher));
+        }
+        return configVars;
+    }
+
+    private static String replace(Map<String, Map<String, Map<String, 
String>>> lookupsByProvider,
+                                  String value,
+                                  Pattern pattern) {
+        Matcher matcher = pattern.matcher(value);
+        StringBuilder builder = new StringBuilder();
+        int i = 0;
+        while (matcher.find()) {
+            ConfigVariable configVar = new ConfigVariable(matcher);
+            Map<String, Map<String, String>> lookupsByPath = 
lookupsByProvider.get(configVar.providerName);
+            if (lookupsByPath != null) {
+                Map<String, String> keyValues = 
lookupsByPath.get(configVar.path);
+                String replacement = keyValues.get(configVar.variable);
+                builder.append(value, i, matcher.start());
+                if (replacement == null) {
+                    // No replacements will be performed; just return the 
original value
+                    builder.append(matcher.group(0));
+                } else {
+                    builder.append(replacement);
+                }
+                i = matcher.end();
+            }
+        }
+        builder.append(value, i, value.length());
+        return builder.toString();
+    }
+
+    private static class ConfigVariable {
+        final String providerName;
+        final String path;
+        final String variable;
+
+        ConfigVariable(Matcher matcher) {
+            this.providerName = matcher.group(1);
+            this.path = matcher.group(3) != null ? matcher.group(3) : 
EMPTY_PATH;
+            this.variable = matcher.group(4);
+        }
+
+        public String toString() {
+            return "(" + providerName + ":" + (path != null ? path + ":" : "") 
+ variable + ")";
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java
 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java
new file mode 100644
index 00000000000..df7bea62f37
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformerResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.Map;
+
+/**
+ * The result of a transformation from {@link ConfigTransformer}.
+ */
+public class ConfigTransformerResult {
+
+    private Map<String, Long> ttls;
+    private Map<String, String> data;
+
+    /**
+     * Creates a new ConfigTransformerResult with the given data and TTL 
values for a set of paths.
+     *
+     * @param data a Map of key-value pairs
+     * @param ttls a Map of path and TTL values (in milliseconds)
+     */
+    public ConfigTransformerResult(Map<String, String> data, Map<String, Long> 
ttls) {
+        this.data = data;
+        this.ttls = ttls;
+    }
+
+    /**
+     * Returns the transformed data, with variables replaced with 
corresponding values from the
+     * ConfigProvider instances if found.
+     *
+     * <p>Modifying the transformed data that is returned does not affect the 
{@link ConfigProvider} nor the
+     * original data that was used as the source of the transformation.
+     *
+     * @return data a Map of key-value pairs
+     */
+    public Map<String, String> data() {
+        return data;
+    }
+
+    /**
+     * Returns the TTL values (in milliseconds) returned from the 
ConfigProvider instances for a given set of paths.
+     *
+     * @return data a Map of path and TTL values
+     */
+    public Map<String, Long> ttls() {
+        return ttls;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java 
b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
new file mode 100644
index 00000000000..fefc93566f3
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/FileConfigProvider.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * An implementation of {@link ConfigProvider} that represents a Properties 
file.
+ * All property keys and values are stored as cleartext.
+ */
+public class FileConfigProvider implements ConfigProvider {
+
+    public void configure(Map<String, ?> configs) {
+    }
+
+    /**
+     * Retrieves the data at the given Properties file.
+     *
+     * @param path the file where the data resides
+     * @return the configuration data
+     */
+    public ConfigData get(String path) {
+        Map<String, String> data = new HashMap<>();
+        if (path == null || path.isEmpty()) {
+            return new ConfigData(data);
+        }
+        try (Reader reader = reader(path)) {
+            Properties properties = new Properties();
+            properties.load(reader);
+            Enumeration<Object> keys = properties.keys();
+            while (keys.hasMoreElements()) {
+                String key = keys.nextElement().toString();
+                String value = properties.getProperty(key);
+                if (value != null) {
+                    data.put(key, value);
+                }
+            }
+            return new ConfigData(data);
+        } catch (IOException e) {
+            throw new ConfigException("Could not read properties from file " + 
path);
+        }
+    }
+
+    /**
+     * Retrieves the data with the given keys at the given Properties file.
+     *
+     * @param path the file where the data resides
+     * @param keys the keys whose values will be retrieved
+     * @return the configuration data
+     */
+    public ConfigData get(String path, Set<String> keys) {
+        Map<String, String> data = new HashMap<>();
+        if (path == null || path.isEmpty()) {
+            return new ConfigData(data);
+        }
+        try (Reader reader = reader(path)) {
+            Properties properties = new Properties();
+            properties.load(reader);
+            for (String key : keys) {
+                String value = properties.getProperty(key);
+                if (value != null) {
+                    data.put(key, value);
+                }
+            }
+            return new ConfigData(data);
+        } catch (IOException e) {
+            throw new ConfigException("Could not read properties from file " + 
path);
+        }
+    }
+
+    // visible for testing
+    protected Reader reader(String path) throws IOException {
+        return new InputStreamReader(new FileInputStream(path), 
StandardCharsets.UTF_8);
+    }
+
+    public void close() {
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
new file mode 100644
index 00000000000..7bc74f36e9b
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ConfigTransformerTest {
+
+    public static final String MY_KEY = "myKey";
+    public static final String TEST_INDIRECTION = "testIndirection";
+    public static final String TEST_KEY = "testKey";
+    public static final String TEST_KEY_WITH_TTL = "testKeyWithTTL";
+    public static final String TEST_PATH = "testPath";
+    public static final String TEST_RESULT = "testResult";
+    public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
+
+    private ConfigTransformer configTransformer;
+
+    @Before
+    public void setup() {
+        configTransformer = new 
ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
+    }
+
+    @Test
+    public void testReplaceVariable() throws Exception {
+        ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:testKey}"));
+        Map<String, String> data = result.data();
+        Map<String, Long> ttls = result.ttls();
+        assertEquals(TEST_RESULT, data.get(MY_KEY));
+        assertTrue(ttls.isEmpty());
+    }
+
+    @Test
+    public void testReplaceVariableWithTTL() throws Exception {
+        ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:testKeyWithTTL}"));
+        Map<String, String> data = result.data();
+        Map<String, Long> ttls = result.ttls();
+        assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
+        assertEquals(1L, ttls.get(TEST_PATH).longValue());
+    }
+
+    @Test
+    public void testReplaceMultipleVariablesInValue() throws Exception {
+        ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, 
${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
+        Map<String, String> data = result.data();
+        assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", 
data.get(MY_KEY));
+    }
+
+    @Test
+    public void testNoReplacement() throws Exception {
+        ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:missingKey}"));
+        Map<String, String> data = result.data();
+        assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
+    }
+
+    @Test
+    public void testSingleLevelOfIndirection() throws Exception {
+        ConfigTransformerResult result = 
configTransformer.transform(Collections.singletonMap(MY_KEY, 
"${test:testPath:testIndirection}"));
+        Map<String, String> data = result.data();
+        assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
+    }
+
+    public static class TestConfigProvider implements ConfigProvider {
+
+        public void configure(Map<String, ?> configs) {
+        }
+
+        public ConfigData get(String path) {
+            return null;
+        }
+
+        public ConfigData get(String path, Set<String> keys) {
+            Map<String, String> data = new HashMap<>();
+            Long ttl = null;
+            if (path.equals(TEST_PATH)) {
+                if (keys.contains(TEST_KEY)) {
+                    data.put(TEST_KEY, TEST_RESULT);
+                }
+                if (keys.contains(TEST_KEY_WITH_TTL)) {
+                    data.put(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL);
+                    ttl = 1L;
+                }
+                if (keys.contains(TEST_INDIRECTION)) {
+                    data.put(TEST_INDIRECTION, "${test:testPath:testResult}");
+                }
+            }
+            return new ConfigData(data, ttl);
+        }
+
+        public void close() {
+        }
+    }
+
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java
new file mode 100644
index 00000000000..9157e380456
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/FileConfigProviderTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class FileConfigProviderTest {
+
+    private FileConfigProvider configProvider;
+
+    @Before
+    public void setup() {
+        configProvider = new TestFileConfigProvider();
+    }
+
+    @Test
+    public void testGetAllKeysAtPath() throws Exception {
+        ConfigData configData = configProvider.get("dummy");
+        Map<String, String> result = new HashMap<>();
+        result.put("testKey", "testResult");
+        result.put("testKey2", "testResult2");
+        assertEquals(result, configData.data());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testGetOneKeyAtPath() throws Exception {
+        ConfigData configData = configProvider.get("dummy", 
Collections.singleton("testKey"));
+        Map<String, String> result = new HashMap<>();
+        result.put("testKey", "testResult");
+        assertEquals(result, configData.data());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testEmptyPath() throws Exception {
+        ConfigData configData = configProvider.get("", 
Collections.singleton("testKey"));
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testEmptyPathWithKey() throws Exception {
+        ConfigData configData = configProvider.get("");
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testNullPath() throws Exception {
+        ConfigData configData = configProvider.get(null);
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testNullPathWithKey() throws Exception {
+        ConfigData configData = configProvider.get(null, 
Collections.singleton("testKey"));
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    public static class TestFileConfigProvider extends FileConfigProvider {
+
+        @Override
+        protected Reader reader(String path) throws IOException {
+            return new 
StringReader("testKey=testResult\ntestKey2=testResult2");
+        }
+    }
+}
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
index 1e214be4b8c..340ef804852 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
@@ -25,6 +25,16 @@
  * Context passed to SinkTasks, allowing them to access utilities in the Kafka 
Connect runtime.
  */
 public interface SinkTaskContext {
+
+    /**
+     * Get the Task configuration.  This is the latest configuration and may 
differ from that passed on startup.
+     *
+     * For example, this method can be used to obtain the latest configuration 
if an external secret has changed,
+     * and the configuration is using variable references such as those 
compatible with
+     * {@link org.apache.kafka.common.config.ConfigTransformer}.
+     */
+    public Map<String, String> configs();
+
     /**
      * Reset the consumer offsets for the given topic partitions. SinkTasks 
should use this if they manage offsets
      * in the sink data store rather than using Kafka consumer offsets. For 
example, an HDFS connector might record
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
index 8eec1dfb138..2e87986648f 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
@@ -18,11 +18,22 @@
 
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 
+import java.util.Map;
+
 /**
  * SourceTaskContext is provided to SourceTasks to allow them to interact with 
the underlying
  * runtime.
  */
 public interface SourceTaskContext {
+    /**
+     * Get the Task configuration.  This is the latest configuration and may 
differ from that passed on startup.
+     *
+     * For example, this method can be used to obtain the latest configuration 
if an external secret has changed,
+     * and the configuration is using variable references such as those 
compatible with
+     * {@link org.apache.kafka.common.config.ConfigTransformer}.
+     */
+    public Map<String, String> configs();
+
     /**
      * Get the OffsetStorageReader for this SourceTask.
      */
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 54854fe4b80..f8c15de8ef4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
@@ -85,12 +86,16 @@ public static void main(String[] args) throws Exception {
             offsetBackingStore.configure(config);
 
             Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore);
+            WorkerConfigTransformer configTransformer = 
worker.configTransformer();
 
             Converter internalValueConverter = 
worker.getInternalValueConverter();
             StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
             statusBackingStore.configure(config);
 
-            ConfigBackingStore configBackingStore = new 
KafkaConfigBackingStore(internalValueConverter, config);
+            ConfigBackingStore configBackingStore = new 
KafkaConfigBackingStore(
+                    internalValueConverter,
+                    config,
+                    configTransformer);
 
             DistributedHerder herder = new DistributedHerder(config, time, 
worker,
                     kafkaClusterId, statusBackingStore, configBackingStore,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index c31568664fc..b5e0ec2c07b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -91,6 +91,7 @@ public AbstractHerder(Worker worker,
                           StatusBackingStore statusBackingStore,
                           ConfigBackingStore configBackingStore) {
         this.worker = worker;
+        this.worker.herder = this;
         this.workerId = workerId;
         this.kafkaClusterId = kafkaClusterId;
         this.statusBackingStore = statusBackingStore;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index a8dd49a4091..c54c160d5ab 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -38,6 +38,7 @@
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static 
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
  * <p>
@@ -91,6 +92,20 @@
     private static final String TRANSFORMS_DOC = "Aliases for the 
transformations to be applied to records.";
     private static final String TRANSFORMS_DISPLAY = "Transforms";
 
+    public static final String CONFIG_RELOAD_ACTION_CONFIG = 
"config.action.reload";
+    private static final String CONFIG_RELOAD_ACTION_DOC =
+            "The action that Connect should take on the connector when changes 
in external " +
+            "configuration providers result in a change in the connector's 
configuration properties. " +
+            "A value of 'none' indicates that Connect will do nothing. " +
+            "A value of 'restart' indicates that Connect should restart/reload 
the connector with the " +
+            "updated configuration properties." +
+            "The restart may actually be scheduled in the future if the 
external configuration provider " +
+            "indicates that a configuration value will expire in the future.";
+
+    private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
+    public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.toString();
+    public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.toString();
+
     private final EnrichedConnectorConfig enrichedConfig;
     private static class EnrichedConnectorConfig extends AbstractConfig {
         EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> 
props) {
@@ -120,7 +135,10 @@ public void ensureValid(String name, Object value) {
                             throw new ConfigException(name, value, "Duplicate 
alias provided.");
                         }
                     }
-                }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 
++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY);
+                }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 
++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
+                .define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, 
CONFIG_RELOAD_ACTION_RESTART,
+                        in(CONFIG_RELOAD_ACTION_NONE, 
CONFIG_RELOAD_ACTION_RESTART), Importance.LOW,
+                        CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, 
++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY);
     }
 
     public ConnectorConfig(Plugins plugins) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 855b08a8a7f..5c7cc1429aa 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -148,6 +148,12 @@
      */
     void restartTask(ConnectorTaskId id, Callback<Void> cb);
 
+    /**
+     * Get the configuration reload action.
+     * @param connName name of the connector
+     */
+    ConfigReloadAction connectorConfigReloadAction(final String connName);
+
     /**
      * Restart the connector.
      * @param connName name of the connector
@@ -155,6 +161,15 @@
      */
     void restartConnector(String connName, Callback<Void> cb);
 
+    /**
+     * Restart the connector.
+     * @param delayMs delay before restart
+     * @param connName name of the connector
+     * @param cb callback to invoke upon completion
+     * @returns The id of the request
+     */
+    HerderRequest restartConnector(long delayMs, String connName, 
Callback<Void> cb);
+
     /**
      * Pause the connector. This call will asynchronously suspend processing 
by the connector and all
      * of its tasks.
@@ -183,6 +198,11 @@
      */
     String kafkaClusterId();
 
+    enum ConfigReloadAction {
+        NONE,
+        RESTART
+    }
+
     class Created<T> {
         private final boolean created;
         private final T result;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java
new file mode 100644
index 00000000000..627da4df823
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderRequest.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+public interface HerderRequest {
+    void cancel();
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index c58eddfb2f7..7a72a0e7b26 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Frequencies;
 import org.apache.kafka.common.metrics.stats.Total;
@@ -30,6 +31,7 @@
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.ErrorReporter;
@@ -76,6 +78,7 @@
 public class Worker {
     private static final Logger log = LoggerFactory.getLogger(Worker.class);
 
+    protected Herder herder;
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
@@ -91,6 +94,7 @@
     private final ConcurrentMap<String, WorkerConnector> connectors = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new 
ConcurrentHashMap<>();
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+    private WorkerConfigTransformer workerConfigTransformer;
 
     public Worker(
             String workerId,
@@ -122,6 +126,8 @@ public Worker(
         this.offsetBackingStore = offsetBackingStore;
         this.offsetBackingStore.configure(config);
 
+        this.workerConfigTransformer = initConfigTransformer();
+
         producerProps = new HashMap<>();
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
@@ -137,6 +143,28 @@ public Worker(
         producerProps.putAll(config.originalsWithPrefix("producer."));
     }
 
+    private WorkerConfigTransformer initConfigTransformer() {
+        final List<String> providerNames = 
config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
+        Map<String, ConfigProvider> providerMap = new HashMap<>();
+        for (String providerName : providerNames) {
+            ConfigProvider configProvider = plugins.newConfigProvider(
+                    config,
+                    WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
+                    ClassLoaderUsage.PLUGINS
+            );
+            providerMap.put(providerName, configProvider);
+        }
+        return new WorkerConfigTransformer(this, providerMap);
+    }
+
+    public WorkerConfigTransformer configTransformer() {
+        return workerConfigTransformer;
+    }
+
+    protected Herder herder() {
+        return herder;
+    }
+
     /**
      * Start worker.
      */
@@ -359,6 +387,7 @@ public boolean isRunning(String connName) {
      */
     public boolean startTask(
             ConnectorTaskId id,
+            ClusterConfigState configState,
             Map<String, String> connProps,
             Map<String, String> taskProps,
             TaskStatus.Listener statusListener,
@@ -419,7 +448,7 @@ public boolean startTask(
                 log.info("Set up the header converter {} for task {} using the 
connector config", headerConverter.getClass(), id);
             }
 
-            workerTask = buildWorkerTask(connConfig, id, task, statusListener, 
initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
+            workerTask = buildWorkerTask(configState, connConfig, id, task, 
statusListener, initialState, keyConverter, valueConverter, headerConverter, 
connectorLoader);
             workerTask.initialize(taskConfig);
             Plugins.compareAndSwapLoaders(savedLoader);
         } catch (Throwable t) {
@@ -444,7 +473,8 @@ public boolean startTask(
         return true;
     }
 
-    private WorkerTask buildWorkerTask(ConnectorConfig connConfig,
+    private WorkerTask buildWorkerTask(ClusterConfigState configState,
+                                       ConnectorConfig connConfig,
                                        ConnectorTaskId id,
                                        Task task,
                                        TaskStatus.Listener statusListener,
@@ -469,13 +499,14 @@ private WorkerTask buildWorkerTask(ConnectorConfig 
connConfig,
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
 
+            // Note we pass the configState as it performs dynamic 
transformations under the covers
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
-                    headerConverter, transformationChain, producer, 
offsetReader, offsetWriter, config, metrics, loader,
+                    headerConverter, transformationChain, producer, 
offsetReader, offsetWriter, config, configState, metrics, loader,
                     time, retryWithToleranceOperator);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
connConfig, errorHandlingMetrics));
-            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, metrics, keyConverter,
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
                     valueConverter, headerConverter, transformationChain, 
loader, time,
                     retryWithToleranceOperator);
         } else {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 3c76d0fa5f2..355cfbb615b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -32,6 +32,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -190,6 +191,11 @@
             + "Examples: 
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
             + "/opt/connectors";
 
+    public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
+    protected static final String CONFIG_PROVIDERS_DOC = "List of 
configuration providers. "
+            + "This is a comma-separated list of the fully-qualified names of 
the ConfigProvider implementations, "
+            + "in the order they will be created, configured, and used.";
+
     public static final String REST_EXTENSION_CLASSES_CONFIG = 
"rest.extension.classes";
     protected static final String REST_EXTENSION_CLASSES_DOC =
             "Comma-separated names of <code>ConnectRestExtension</code> 
classes, loaded and called "
@@ -262,6 +268,9 @@ protected static ConfigDef baseConfigDef() {
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         HEADER_CONVERTER_CLASS_DEFAULT,
                         Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
+                .define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
+                        Collections.emptyList(),
+                        Importance.LOW, CONFIG_PROVIDERS_DOC)
                 .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
                         Importance.LOW, REST_EXTENSION_CLASSES_DOC);
     }
@@ -334,4 +343,5 @@ public WorkerConfig(ConfigDef definition, Map<String, 
String> props) {
         super(definition, props);
         logInternalConverterDeprecationWarnings(props);
     }
+
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
new file mode 100644
index 00000000000..d91411cb8e8
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigProvider;
+import org.apache.kafka.common.config.ConfigTransformer;
+import org.apache.kafka.common.config.ConfigTransformerResult;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A wrapper class to perform configuration transformations and schedule 
reloads for any
+ * retrieved TTL values.
+ */
+public class WorkerConfigTransformer {
+    private final Worker worker;
+    private final ConfigTransformer configTransformer;
+    private final ConcurrentMap<String, Map<String, HerderRequest>> requests = 
new ConcurrentHashMap<>();
+
+    public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> 
configProviders) {
+        this.worker = worker;
+        this.configTransformer = new ConfigTransformer(configProviders);
+    }
+
+    public Map<String, String> transform(String connectorName, Map<String, 
String> configs) {
+        ConfigTransformerResult result = configTransformer.transform(configs);
+        scheduleReload(connectorName, result.ttls());
+        return result.data();
+    }
+
+    private void scheduleReload(String connectorName, Map<String, Long> ttls) {
+        for (Map.Entry<String, Long> entry : ttls.entrySet()) {
+            scheduleReload(connectorName, entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void scheduleReload(String connectorName, String path, long ttl) {
+        Herder herder = worker.herder();
+        if (herder.connectorConfigReloadAction(connectorName) == 
Herder.ConfigReloadAction.RESTART) {
+            Map<String, HerderRequest> connectorRequests = 
requests.get(connectorName);
+            if (connectorRequests == null) {
+                connectorRequests = new ConcurrentHashMap<>();
+                requests.put(connectorName, connectorRequests);
+            } else {
+                HerderRequest previousRequest = connectorRequests.get(path);
+                if (previousRequest != null) {
+                    // Delete previous request for ttl which is now stale
+                    previousRequest.cancel();
+                }
+            }
+            HerderRequest request = herder.restartConnector(ttl, 
connectorName, null);
+            connectorRequests.put(path, request);
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 3296007b364..47f8529e2d1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -40,6 +40,7 @@
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -70,6 +71,7 @@
 
     private final WorkerConfig workerConfig;
     private final SinkTask task;
+    private final ClusterConfigState configState;
     private Map<String, String> taskConfig;
     private final Time time;
     private final Converter keyConverter;
@@ -96,6 +98,7 @@ public WorkerSinkTask(ConnectorTaskId id,
                           TaskStatus.Listener statusListener,
                           TargetState initialState,
                           WorkerConfig workerConfig,
+                          ClusterConfigState configState,
                           ConnectMetrics connectMetrics,
                           Converter keyConverter,
                           Converter valueConverter,
@@ -108,6 +111,7 @@ public WorkerSinkTask(ConnectorTaskId id,
 
         this.workerConfig = workerConfig;
         this.task = task;
+        this.configState = configState;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
         this.headerConverter = headerConverter;
@@ -133,7 +137,7 @@ public void initialize(TaskConfig taskConfig) {
         try {
             this.taskConfig = taskConfig.originalsStrings();
             this.consumer = createConsumer();
-            this.context = new WorkerSinkTaskContext(consumer, this);
+            this.context = new WorkerSinkTaskContext(consumer, this, 
configState);
         } catch (Throwable t) {
             log.error("{} Task failed initialization and will not be 
started.", this, t);
             onFailure(t);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 386f992e82a..3a6b0d6d7b8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.sink.SinkTaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,17 +38,26 @@
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
     private final WorkerSinkTask sinkTask;
+    private final ClusterConfigState configState;
     private final Set<TopicPartition> pausedPartitions;
     private boolean commitRequested;
 
-    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, 
WorkerSinkTask sinkTask) {
+    public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer,
+                                 WorkerSinkTask sinkTask,
+                                 ClusterConfigState configState) {
         this.offsets = new HashMap<>();
         this.timeoutMs = -1L;
         this.consumer = consumer;
         this.sinkTask = sinkTask;
+        this.configState = configState;
         this.pausedPartitions = new HashSet<>();
     }
 
+    @Override
+    public Map<String, String> configs() {
+        return configState.taskConfig(sinkTask.id());
+    }
+
     @Override
     public void offset(Map<TopicPartition, Long> offsets) {
         log.debug("{} Setting offsets for topic partitions {}", this, offsets);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index e7b92a4d403..70d0cf9d7ae 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -34,6 +34,7 @@
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -66,6 +67,7 @@
 
     private final WorkerConfig workerConfig;
     private final SourceTask task;
+    private final ClusterConfigState configState;
     private final Converter keyConverter;
     private final Converter valueConverter;
     private final HeaderConverter headerConverter;
@@ -103,6 +105,7 @@ public WorkerSourceTask(ConnectorTaskId id,
                             OffsetStorageReader offsetReader,
                             OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig,
+                            ClusterConfigState configState,
                             ConnectMetrics connectMetrics,
                             ClassLoader loader,
                             Time time,
@@ -112,6 +115,7 @@ public WorkerSourceTask(ConnectorTaskId id,
 
         this.workerConfig = workerConfig;
         this.task = task;
+        this.configState = configState;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
         this.headerConverter = headerConverter;
@@ -190,7 +194,7 @@ private synchronized void tryStop() {
     @Override
     public void execute() {
         try {
-            task.initialize(new WorkerSourceTaskContext(offsetReader));
+            task.initialize(new WorkerSourceTaskContext(offsetReader, this, 
configState));
             task.start(taskConfig);
             log.info("{} Source task finished initialization and start", this);
             synchronized (this) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
index 8f60e57e005..fe1409b282a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTaskContext.java
@@ -16,15 +16,29 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 
+import java.util.Map;
+
 public class WorkerSourceTaskContext implements SourceTaskContext {
 
     private final OffsetStorageReader reader;
+    private final WorkerSourceTask task;
+    private final ClusterConfigState configState;
 
-    public WorkerSourceTaskContext(OffsetStorageReader reader) {
+    public WorkerSourceTaskContext(OffsetStorageReader reader,
+                                   WorkerSourceTask task,
+                                   ClusterConfigState configState) {
         this.reader = reader;
+        this.task = task;
+        this.configState = configState;
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return configState.taskConfig(task.id());
     }
 
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
index cac71ddc73b..9507706840f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.distributed;
 
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
@@ -24,6 +25,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -46,6 +48,7 @@
     private final Map<String, TargetState> connectorTargetStates;
     private final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
     private final Set<String> inconsistentConnectors;
+    private final WorkerConfigTransformer configTransformer;
 
     public ClusterConfigState(long offset,
                               Map<String, Integer> connectorTaskCounts,
@@ -53,12 +56,29 @@ public ClusterConfigState(long offset,
                               Map<String, TargetState> connectorTargetStates,
                               Map<ConnectorTaskId, Map<String, String>> 
taskConfigs,
                               Set<String> inconsistentConnectors) {
+        this(offset,
+                connectorTaskCounts,
+                connectorConfigs,
+                connectorTargetStates,
+                taskConfigs,
+                inconsistentConnectors,
+                null);
+    }
+
+    public ClusterConfigState(long offset,
+                              Map<String, Integer> connectorTaskCounts,
+                              Map<String, Map<String, String>> 
connectorConfigs,
+                              Map<String, TargetState> connectorTargetStates,
+                              Map<ConnectorTaskId, Map<String, String>> 
taskConfigs,
+                              Set<String> inconsistentConnectors,
+                              WorkerConfigTransformer configTransformer) {
         this.offset = offset;
         this.connectorTaskCounts = connectorTaskCounts;
         this.connectorConfigs = connectorConfigs;
         this.connectorTargetStates = connectorTargetStates;
         this.taskConfigs = taskConfigs;
         this.inconsistentConnectors = inconsistentConnectors;
+        this.configTransformer = configTransformer;
     }
 
     /**
@@ -87,12 +107,19 @@ public boolean contains(String connector) {
     }
 
     /**
-     * Get the configuration for a connector.
+     * Get the configuration for a connector.  The configuration will have 
been transformed by
+     * {@link org.apache.kafka.common.config.ConfigTransformer} by having all 
variable
+     * references replaced with the current values from external instances of
+     * {@link org.apache.kafka.common.config.ConfigProvider}, and may include 
secrets.
      * @param connector name of the connector
      * @return a map containing configuration parameters
      */
     public Map<String, String> connectorConfig(String connector) {
-        return connectorConfigs.get(connector);
+        Map<String, String> configs = connectorConfigs.get(connector);
+        if (configTransformer != null) {
+            configs = configTransformer.transform(connector, configs);
+        }
+        return configs;
     }
 
     /**
@@ -105,12 +132,19 @@ public TargetState targetState(String connector) {
     }
 
     /**
-     * Get the configuration for a task.
+     * Get the configuration for a task.  The configuration will have been 
transformed by
+     * {@link org.apache.kafka.common.config.ConfigTransformer} by having all 
variable
+     * references replaced with the current values from external instances of
+     * {@link org.apache.kafka.common.config.ConfigProvider}, and may include 
secrets.
      * @param task id of the task
      * @return a map containing configuration parameters
      */
     public Map<String, String> taskConfig(ConnectorTaskId task) {
-        return taskConfigs.get(task);
+        Map<String, String> configs = taskConfigs.get(task);
+        if (configTransformer != null) {
+            configs = configTransformer.transform(task.connector(), configs);
+        }
+        return configs;
     }
 
     /**
@@ -184,4 +218,30 @@ public String toString() {
                 ", inconsistentConnectors=" + inconsistentConnectors +
                 '}';
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ClusterConfigState that = (ClusterConfigState) o;
+        return offset == that.offset &&
+                Objects.equals(connectorTaskCounts, that.connectorTaskCounts) 
&&
+                Objects.equals(connectorConfigs, that.connectorConfigs) &&
+                Objects.equals(connectorTargetStates, 
that.connectorTargetStates) &&
+                Objects.equals(taskConfigs, that.taskConfigs) &&
+                Objects.equals(inconsistentConnectors, 
that.inconsistentConnectors) &&
+                Objects.equals(configTransformer, that.configTransformer);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                offset,
+                connectorTaskCounts,
+                connectorConfigs,
+                connectorTargetStates,
+                taskConfigs,
+                inconsistentConnectors,
+                configTransformer);
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 5e9707aa29e..5efb78a93e4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -38,6 +38,7 @@
 import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.HerderRequest;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
@@ -60,6 +61,7 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
@@ -139,7 +141,7 @@
 
     // To handle most external requests, like creating or destroying a 
connector, we can use a generic request where
     // the caller specifies all the code that should be executed.
-    final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
+    final NavigableSet<DistributedHerderRequest> requests = new 
ConcurrentSkipListSet<>();
     // Config updates can be collected and applied together when possible. 
Also, we need to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to 
coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
@@ -255,7 +257,7 @@ public void tick() {
         final long now = time.milliseconds();
         long nextRequestTimeoutMs = Long.MAX_VALUE;
         while (true) {
-            final HerderRequest next = peekWithoutException();
+            final DistributedHerderRequest next = peekWithoutException();
             if (next == null) {
                 break;
             } else if (now >= next.at) {
@@ -382,7 +384,7 @@ public void halt() {
 
             // Explicitly fail any outstanding requests so they actually get a 
response and get an
             // understandable reason for their failure.
-            HerderRequest request = requests.pollFirst();
+            DistributedHerderRequest request = requests.pollFirst();
             while (request != null) {
                 request.callback().onCompletion(new ConnectException("Worker 
is shutting down"), null);
                 request = requests.pollFirst();
@@ -640,9 +642,21 @@ else if (!configState.contains(connName))
         );
     }
 
+    @Override
+    public ConfigReloadAction connectorConfigReloadAction(final String 
connName) {
+        return ConfigReloadAction.valueOf(
+                
configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
+                        .toUpperCase(Locale.ROOT));
+    }
+
     @Override
     public void restartConnector(final String connName, final Callback<Void> 
callback) {
-        addRequest(new Callable<Void>() {
+        restartConnector(0, connName, callback);
+    }
+
+    @Override
+    public HerderRequest restartConnector(final long delayMs, final String 
connName, final Callback<Void> callback) {
+        return addRequest(delayMs, new Callable<Void>() {
             @Override
             public Void call() throws Exception {
                 if (checkRebalanceNeeded(callback))
@@ -858,6 +872,7 @@ private boolean startTask(ConnectorTaskId taskId) {
         log.info("Starting task {}", taskId);
         return worker.startTask(
                 taskId,
+                configState,
                 configState.connectorConfig(taskId.connector()),
                 configState.taskConfig(taskId),
                 this,
@@ -945,7 +960,7 @@ private void reconfigureConnectorTasksWithRetry(final 
String connName) {
             public void onCompletion(Throwable error, Void result) {
                 // If we encountered an error, we don't have much choice but 
to just retry. If we don't, we could get
                 // stuck with a connector that thinks it has generated tasks, 
but wasn't actually successful and therefore
-                // never makes progress. The retry has to run through a 
HerderRequest since this callback could be happening
+                // never makes progress. The retry has to run through a 
DistributedHerderRequest since this callback could be happening
                 // from the HTTP request forwarding thread.
                 if (error != null) {
                     log.error("Failed to reconfigure connector's tasks, 
retrying after backoff:", error);
@@ -1041,19 +1056,19 @@ private boolean checkRebalanceNeeded(Callback<?> 
callback) {
         return false;
     }
 
-    HerderRequest addRequest(Callable<Void> action, Callback<Void> callback) {
+    DistributedHerderRequest addRequest(Callable<Void> action, Callback<Void> 
callback) {
         return addRequest(0, action, callback);
     }
 
-    HerderRequest addRequest(long delayMs, Callable<Void> action, 
Callback<Void> callback) {
-        HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, 
requestSeqNum.incrementAndGet(), action, callback);
+    DistributedHerderRequest addRequest(long delayMs, Callable<Void> action, 
Callback<Void> callback) {
+        DistributedHerderRequest req = new 
DistributedHerderRequest(time.milliseconds() + delayMs, 
requestSeqNum.incrementAndGet(), action, callback);
         requests.add(req);
         if (peekWithoutException() == req)
             member.wakeup();
         return req;
     }
 
-    private HerderRequest peekWithoutException() {
+    private DistributedHerderRequest peekWithoutException() {
         try {
             return requests.isEmpty() ? null : requests.first();
         } catch (NoSuchElementException e) {
@@ -1117,13 +1132,13 @@ public void onConnectorTargetStateChange(String 
connector) {
         }
     }
 
-    static class HerderRequest implements Comparable<HerderRequest> {
+    class DistributedHerderRequest implements HerderRequest, 
Comparable<DistributedHerderRequest> {
         private final long at;
         private final long seq;
         private final Callable<Void> action;
         private final Callback<Void> callback;
 
-        public HerderRequest(long at, long seq, Callable<Void> action, 
Callback<Void> callback) {
+        public DistributedHerderRequest(long at, long seq, Callable<Void> 
action, Callback<Void> callback) {
             this.at = at;
             this.seq = seq;
             this.action = action;
@@ -1139,7 +1154,12 @@ public HerderRequest(long at, long seq, Callable<Void> 
action, Callback<Void> ca
         }
 
         @Override
-        public int compareTo(HerderRequest o) {
+        public void cancel() {
+            DistributedHerder.this.requests.remove(this);
+        }
+
+        @Override
+        public int compareTo(DistributedHerderRequest o) {
             final int cmp = Long.compare(at, o.at);
             return cmp == 0 ? Long.compare(seq, o.seq) : cmp;
         }
@@ -1147,9 +1167,9 @@ public int compareTo(HerderRequest o) {
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
-            if (!(o instanceof HerderRequest))
+            if (!(o instanceof DistributedHerderRequest))
                 return false;
-            HerderRequest other = (HerderRequest) o;
+            DistributedHerderRequest other = (DistributedHerderRequest) o;
             return compareTo(other) == 0;
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index b56bd1a7d91..1e598517507 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
@@ -66,6 +67,7 @@
     private final SortedSet<PluginDesc<Converter>> converters;
     private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
     private final SortedSet<PluginDesc<Transformation>> transformations;
+    private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
     private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
     private final List<String> pluginPaths;
     private final Map<Path, PluginClassLoader> activePaths;
@@ -80,6 +82,7 @@ public DelegatingClassLoader(List<String> pluginPaths, 
ClassLoader parent) {
         this.converters = new TreeSet<>();
         this.headerConverters = new TreeSet<>();
         this.transformations = new TreeSet<>();
+        this.configProviders = new TreeSet<>();
         this.restExtensions = new TreeSet<>();
     }
 
@@ -103,6 +106,10 @@ public DelegatingClassLoader(List<String> pluginPaths) {
         return transformations;
     }
 
+    public Set<PluginDesc<ConfigProvider>> configProviders() {
+        return configProviders;
+    }
+
     public Set<PluginDesc<ConnectRestExtension>> restExtensions() {
         return restExtensions;
     }
@@ -236,6 +243,8 @@ private void scanUrlsAndAddPlugins(
             headerConverters.addAll(plugins.headerConverters());
             addPlugins(plugins.transformations(), loader);
             transformations.addAll(plugins.transformations());
+            addPlugins(plugins.configProviders(), loader);
+            configProviders.addAll(plugins.configProviders());
             addPlugins(plugins.restExtensions(), loader);
             restExtensions.addAll(plugins.restExtensions());
         }
@@ -292,6 +301,7 @@ private PluginScanResult scanPluginPath(
                 getPluginDesc(reflections, Converter.class, loader),
                 getPluginDesc(reflections, HeaderConverter.class, loader),
                 getPluginDesc(reflections, Transformation.class, loader),
+                getPluginDesc(reflections, ConfigProvider.class, loader),
                 getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
         );
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index 6f48e5694bd..87b0b70c503 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.storage.Converter;
@@ -29,6 +30,7 @@
     private final Collection<PluginDesc<Converter>> converters;
     private final Collection<PluginDesc<HeaderConverter>> headerConverters;
     private final Collection<PluginDesc<Transformation>> transformations;
+    private final Collection<PluginDesc<ConfigProvider>> configProviders;
     private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
 
     public PluginScanResult(
@@ -36,12 +38,14 @@ public PluginScanResult(
             Collection<PluginDesc<Converter>> converters,
             Collection<PluginDesc<HeaderConverter>> headerConverters,
             Collection<PluginDesc<Transformation>> transformations,
+            Collection<PluginDesc<ConfigProvider>> configProviders,
             Collection<PluginDesc<ConnectRestExtension>> restExtensions
     ) {
         this.connectors = connectors;
         this.converters = converters;
         this.headerConverters = headerConverters;
         this.transformations = transformations;
+        this.configProviders = configProviders;
         this.restExtensions = restExtensions;
     }
 
@@ -61,6 +65,10 @@ public PluginScanResult(
         return transformations;
     }
 
+    public Collection<PluginDesc<ConfigProvider>> configProviders() {
+        return configProviders;
+    }
+
     public Collection<PluginDesc<ConnectRestExtension>> restExtensions() {
         return restExtensions;
     }
@@ -70,6 +78,7 @@ public boolean isEmpty() {
                && converters().isEmpty()
                && headerConverters().isEmpty()
                && transformations().isEmpty()
+               && configProviders().isEmpty()
                && restExtensions().isEmpty();
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
index 918f9d73f56..906b85f7003 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -31,6 +32,7 @@
     CONNECTOR(Connector.class),
     CONVERTER(Converter.class),
     TRANSFORMATION(Transformation.class),
+    CONFIGPROVIDER(ConfigProvider.class),
     REST_EXTENSION(ConnectRestExtension.class),
     UNKNOWN(Object.class);
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 96074106de0..c89accd3805 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigProvider;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
@@ -147,6 +148,10 @@ public DelegatingClassLoader delegatingLoader() {
         return delegatingLoader.transformations();
     }
 
+    public Set<PluginDesc<ConfigProvider>> configProviders() {
+        return delegatingLoader.configProviders();
+    }
+
     public Connector newConnector(String connectorClassOrAlias) {
         Class<? extends Connector> klass;
         try {
@@ -318,6 +323,45 @@ public HeaderConverter newHeaderConverter(AbstractConfig 
config, String classPro
         return plugin;
     }
 
+    public ConfigProvider newConfigProvider(AbstractConfig config, String 
providerPrefix, ClassLoaderUsage classLoaderUsage) {
+        String classPropertyName = providerPrefix + ".class";
+        Map<String, String> originalConfig = config.originalsStrings();
+        if (!originalConfig.containsKey(classPropertyName)) {
+            // This configuration does not define the config provider via the 
specified property name
+            return null;
+        }
+        ConfigProvider plugin = null;
+        switch (classLoaderUsage) {
+            case CURRENT_CLASSLOADER:
+                // Attempt to load first with the current classloader, and 
plugins as a fallback.
+                plugin = getInstance(config, classPropertyName, 
ConfigProvider.class);
+                break;
+            case PLUGINS:
+                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
+                String configProviderClassOrAlias = 
originalConfig.get(classPropertyName);
+                Class<? extends ConfigProvider> klass;
+                try {
+                    klass = pluginClass(delegatingLoader, 
configProviderClassOrAlias, ConfigProvider.class);
+                } catch (ClassNotFoundException e) {
+                    throw new ConnectException(
+                            "Failed to find any class that implements 
ConfigProvider and which name matches "
+                                    + configProviderClassOrAlias + ", 
available ConfigProviders are: "
+                                    + 
pluginNames(delegatingLoader.configProviders())
+                    );
+                }
+                plugin = newPlugin(klass);
+                break;
+        }
+        if (plugin == null) {
+            throw new ConnectException("Unable to instantiate the 
ConfigProvider specified in '" + classPropertyName + "'");
+        }
+
+        // Configure the ConfigProvider
+        String configPrefix = providerPrefix + ".param.";
+        Map<String, Object> configProviderConfig = 
config.originalsWithPrefix(configPrefix);
+        plugin.configure(configProviderConfig);
+        return plugin;
+    }
 
     /**
      * If the given class names are available in the classloader, return a 
list of new configured
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 96f8e8767bb..20c6a24d384 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.HerderRequest;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
@@ -41,7 +42,14 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -50,10 +58,17 @@
 public class StandaloneHerder extends AbstractHerder {
     private static final Logger log = 
LoggerFactory.getLogger(StandaloneHerder.class);
 
+    private final AtomicLong requestSeqNum = new AtomicLong();
+    private final ScheduledExecutorService requestExecutorService;
+
     private ClusterConfigState configState;
 
     public StandaloneHerder(Worker worker, String kafkaClusterId) {
-        this(worker, worker.workerId(), kafkaClusterId, new 
MemoryStatusBackingStore(), new MemoryConfigBackingStore());
+        this(worker,
+                worker.workerId(),
+                kafkaClusterId,
+                new MemoryStatusBackingStore(),
+                new MemoryConfigBackingStore(worker.configTransformer()));
     }
 
     // visible for testing
@@ -64,6 +79,7 @@ public StandaloneHerder(Worker worker, String kafkaClusterId) 
{
                      MemoryConfigBackingStore configBackingStore) {
         super(worker, workerId, kafkaClusterId, statusBackingStore, 
configBackingStore);
         this.configState = ClusterConfigState.EMPTY;
+        this.requestExecutorService = 
Executors.newSingleThreadScheduledExecutor();
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -77,6 +93,13 @@ public synchronized void start() {
     @Override
     public synchronized void stop() {
         log.info("Herder stopping");
+        requestExecutorService.shutdown();
+        try {
+            if (!requestExecutorService.awaitTermination(30, TimeUnit.SECONDS))
+                requestExecutorService.shutdownNow();
+        } catch (InterruptedException e) {
+            // ignore
+        }
 
         // There's no coordination/hand-off to do here since this is all 
standalone. Instead, we
         // should just clean up the stuff we normally would, i.e. cleanly 
checkpoint and shutdown all
@@ -229,12 +252,19 @@ public synchronized void restartTask(ConnectorTaskId 
taskId, Callback<Void> cb)
 
         TargetState targetState = configState.targetState(taskId.connector());
         worker.stopAndAwaitTask(taskId);
-        if (worker.startTask(taskId, connConfigProps, taskConfigProps, this, 
targetState))
+        if (worker.startTask(taskId, configState, connConfigProps, 
taskConfigProps, this, targetState))
             cb.onCompletion(null, null);
         else
             cb.onCompletion(new ConnectException("Failed to start task: " + 
taskId), null);
     }
 
+    @Override
+    public ConfigReloadAction connectorConfigReloadAction(final String 
connName) {
+        return ConfigReloadAction.valueOf(
+                
configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
+                        .toUpperCase(Locale.ROOT));
+    }
+
     @Override
     public synchronized void restartConnector(String connName, Callback<Void> 
cb) {
         if (!configState.contains(connName))
@@ -248,11 +278,24 @@ public synchronized void restartConnector(String 
connName, Callback<Void> cb) {
             cb.onCompletion(new ConnectException("Failed to start connector: " 
+ connName), null);
     }
 
+    @Override
+    public synchronized HerderRequest restartConnector(long delayMs, final 
String connName, final Callback<Void> cb) {
+        ScheduledFuture<?> future = requestExecutorService.schedule(new 
Runnable() {
+            @Override
+            public void run() {
+                restartConnector(connName, cb);
+            }
+        }, delayMs, TimeUnit.MILLISECONDS);
+
+        return new StandaloneHerderRequest(requestSeqNum.incrementAndGet(), 
future);
+    }
+
     private boolean startConnector(Map<String, String> connectorProps) {
         String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
         configBackingStore.putConnectorConfig(connName, connectorProps);
+        Map<String, String> connConfigs = 
configState.connectorConfig(connName);
         TargetState targetState = configState.targetState(connName);
-        return worker.startConnector(connName, connectorProps, new 
HerderConnectorContext(this, connName), this, targetState);
+        return worker.startConnector(connName, connConfigs, new 
HerderConnectorContext(this, connName), this, targetState);
     }
 
     private List<Map<String, String>> recomputeTaskConfigs(String connName) {
@@ -270,7 +313,7 @@ private void createConnectorTasks(String connName, 
TargetState initialState) {
 
         for (ConnectorTaskId taskId : configState.tasks(connName)) {
             Map<String, String> taskConfigMap = configState.taskConfig(taskId);
-            worker.startTask(taskId, connConfigs, taskConfigMap, this, 
initialState);
+            worker.startTask(taskId, configState, connConfigs, taskConfigMap, 
this, initialState);
         }
     }
 
@@ -342,4 +385,32 @@ public void onConnectorTargetStateChange(String connector) 
{
         }
     }
 
+    static class StandaloneHerderRequest implements HerderRequest {
+        private final long seq;
+        private final ScheduledFuture<?> future;
+
+        public StandaloneHerderRequest(long seq, ScheduledFuture<?> future) {
+            this.seq = seq;
+            this.future = future;
+        }
+
+        @Override
+        public void cancel() {
+            future.cancel(false);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof StandaloneHerderRequest))
+                return false;
+            StandaloneHerderRequest other = (StandaloneHerderRequest) o;
+            return seq == other.seq;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(seq);
+        }
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index e51b365cec6..ea196650c5e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -35,6 +35,7 @@
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
@@ -221,7 +222,9 @@ public static String COMMIT_TASKS_KEY(String connectorName) 
{
 
     private final Map<String, TargetState> connectorTargetStates = new 
HashMap<>();
 
-    public KafkaConfigBackingStore(Converter converter, WorkerConfig config) {
+    private final WorkerConfigTransformer configTransformer;
+
+    public KafkaConfigBackingStore(Converter converter, WorkerConfig config, 
WorkerConfigTransformer configTransformer) {
         this.lock = new Object();
         this.started = false;
         this.converter = converter;
@@ -232,6 +235,7 @@ public KafkaConfigBackingStore(Converter converter, 
WorkerConfig config) {
             throw new ConfigException("Must specify topic for connector 
configuration.");
 
         configLog = setupAndCreateKafkaBasedLog(this.topic, config);
+        this.configTransformer = configTransformer;
     }
 
     @Override
@@ -270,7 +274,8 @@ public ClusterConfigState snapshot() {
                     new HashMap<>(connectorConfigs),
                     new HashMap<>(connectorTargetStates),
                     new HashMap<>(taskConfigs),
-                    new HashSet<>(inconsistent)
+                    new HashSet<>(inconsistent),
+                    configTransformer
             );
         }
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index 25891f52354..7e7d62ba3c9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
@@ -32,6 +33,14 @@
 
     private Map<String, ConnectorState> connectors = new HashMap<>();
     private UpdateListener updateListener;
+    private WorkerConfigTransformer configTransformer;
+
+    public MemoryConfigBackingStore() {
+    }
+
+    public MemoryConfigBackingStore(WorkerConfigTransformer configTransformer) 
{
+        this.configTransformer = configTransformer;
+    }
 
     @Override
     public synchronized void start() {
@@ -63,7 +72,8 @@ public synchronized ClusterConfigState snapshot() {
                 connectorConfigs,
                 connectorTargetStates,
                 taskConfigs,
-                Collections.<String>emptySet());
+                Collections.<String>emptySet(),
+                configTransformer);
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 0718eb17653..da017e851b7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -172,14 +172,14 @@ public void testConfigValidationMissingName() {
         assertEquals(TestSourceConnector.class.getName(), result.name());
         assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, 
ConnectorConfig.TRANSFORMS_GROUP), result.groups());
         assertEquals(2, result.errorCount());
-        // Base connector config has 7 fields, connector's configs add 2
-        assertEquals(9, result.values().size());
+        // Base connector config has 8 fields, connector's configs add 2
+        assertEquals(10, result.values().size());
         // Missing name should generate an error
         assertEquals(ConnectorConfig.NAME_CONFIG, 
result.values().get(0).configValue().name());
         assertEquals(1, result.values().get(0).configValue().errors().size());
         // "required" config from connector should generate an error
-        assertEquals("required", result.values().get(7).configValue().name());
-        assertEquals(1, result.values().get(7).configValue().errors().size());
+        assertEquals("required", result.values().get(8).configValue().name());
+        assertEquals(1, result.values().get(8).configValue().errors().size());
 
         verifyAll();
     }
@@ -233,15 +233,15 @@ public void testConfigValidationTransformsExtendResults() 
{
         );
         assertEquals(expectedGroups, result.groups());
         assertEquals(2, result.errorCount());
-        // Base connector config has 7 fields, connector's configs add 2, 2 
type fields from the transforms, and
+        // Base connector config has 8 fields, connector's configs add 2, 2 
type fields from the transforms, and
         // 1 from the valid transformation's config
-        assertEquals(12, result.values().size());
+        assertEquals(13, result.values().size());
         // Should get 2 type fields from the transforms, first adds its own 
config since it has a valid class
-        assertEquals("transforms.xformA.type", 
result.values().get(7).configValue().name());
-        assertTrue(result.values().get(7).configValue().errors().isEmpty());
-        assertEquals("transforms.xformA.subconfig", 
result.values().get(8).configValue().name());
-        assertEquals("transforms.xformB.type", 
result.values().get(9).configValue().name());
-        assertFalse(result.values().get(9).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.type", 
result.values().get(8).configValue().name());
+        assertTrue(result.values().get(8).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.subconfig", 
result.values().get(9).configValue().name());
+        assertEquals("transforms.xformB.type", 
result.values().get(10).configValue().name());
+        assertFalse(result.values().get(10).configValue().errors().isEmpty());
 
         verifyAll();
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 5a8bcc5a6cc..b50e7ff0956 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
 import org.apache.kafka.connect.runtime.errors.LogReporter;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
@@ -369,7 +370,8 @@ private void createSinkTask(TargetState initialState, 
RetryWithToleranceOperator
 
         workerSinkTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics, converter, converter,
+                taskId, sinkTask, statusListener, initialState, workerConfig,
+                ClusterConfigState.EMPTY, metrics, converter, converter,
                 headerConverter, sinkTransforms, pluginLoader, time, 
retryWithToleranceOperator);
     }
 
@@ -398,7 +400,8 @@ private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperat
         workerSourceTask = PowerMock.createPartialMock(
                 WorkerSourceTask.class, new String[]{"commitOffsets", 
"isStopping"},
                 taskId, sourceTask, statusListener, initialState, converter, 
converter, headerConverter, sourceTransforms,
-                producer, offsetReader, offsetWriter, workerConfig, metrics, 
pluginLoader, time, retryWithToleranceOperator);
+                producer, offsetReader, offsetWriter, workerConfig,
+                ClusterConfigState.EMPTY, metrics, pluginLoader, time, 
retryWithToleranceOperator);
     }
 
     private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], 
byte[]> record) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
new file mode 100644
index 00000000000..89bba09b0da
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigChangeCallback;
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigProvider;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+public class WorkerConfigTransformerTest {
+
+    public static final String MY_KEY = "myKey";
+    public static final String MY_CONNECTOR = "myConnector";
+    public static final String TEST_KEY = "testKey";
+    public static final String TEST_PATH = "testPath";
+    public static final String TEST_KEY_WITH_TTL = "testKeyWithTTL";
+    public static final String TEST_KEY_WITH_LONGER_TTL = 
"testKeyWithLongerTTL";
+    public static final String TEST_RESULT = "testResult";
+    public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
+    public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
+
+    @Mock private Herder herder;
+    @Mock private Worker worker;
+    @Mock private HerderRequest requestId;
+    private WorkerConfigTransformer configTransformer;
+
+    @Before
+    public void setup() {
+        worker = PowerMock.createMock(Worker.class);
+        herder = PowerMock.createMock(Herder.class);
+        configTransformer = new WorkerConfigTransformer(worker, 
Collections.singletonMap("test", new TestConfigProvider()));
+    }
+
+    @Test
+    public void testReplaceVariable() throws Exception {
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+        assertEquals(TEST_RESULT, result.get(MY_KEY));
+    }
+
+    @Test
+    public void testReplaceVariableWithTTL() throws Exception {
+        EasyMock.expect(worker.herder()).andReturn(herder);
+        
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.NONE);
+
+        replayAll();
+
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+    }
+
+    @Test
+    public void testReplaceVariableWithTTLAndScheduleRestart() throws 
Exception {
+        EasyMock.expect(worker.herder()).andReturn(herder);
+        
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
+        EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, 
null)).andReturn(requestId);
+
+        replayAll();
+
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+    }
+
+    @Test
+    public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() 
throws Exception {
+        EasyMock.expect(worker.herder()).andReturn(herder);
+        
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
+        EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, 
null)).andReturn(requestId);
+
+        EasyMock.expect(worker.herder()).andReturn(herder);
+        
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
+        EasyMock.expectLastCall();
+        requestId.cancel();
+        EasyMock.expectLastCall();
+        EasyMock.expect(herder.restartConnector(10L, MY_CONNECTOR, 
null)).andReturn(requestId);
+
+        replayAll();
+
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+
+        result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithLongerTTL}"));
+        assertEquals(TEST_RESULT_WITH_LONGER_TTL, result.get(MY_KEY));
+    }
+
+    public static class TestConfigProvider implements ConfigProvider {
+
+        public void configure(Map<String, ?> configs) {
+        }
+
+        public ConfigData get(String path) {
+            return null;
+        }
+
+        public ConfigData get(String path, Set<String> keys) {
+            if (path.equals(TEST_PATH)) {
+                if (keys.contains(TEST_KEY)) {
+                    return new ConfigData(Collections.singletonMap(TEST_KEY, 
TEST_RESULT));
+                } else if (keys.contains(TEST_KEY_WITH_TTL)) {
+                    return new 
ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 
1L);
+                } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
+                    return new 
ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, 
TEST_RESULT_WITH_LONGER_TTL), 10L);
+                }
+            }
+            return new ConfigData(Collections.emptyMap());
+        }
+
+        public void subscribe(String path, Set<String> keys, 
ConfigChangeCallback callback) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void unsubscribe(String path, Set<String> keys) {
+            throw new UnsupportedOperationException();
+        }
+
+        public void close() {
+        }
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index ff8507c0945..d23adbf3d69 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -32,6 +32,7 @@
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -162,7 +163,7 @@ public void setUp() {
     private void createTask(TargetState initialState) {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics,
+                taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics,
                 keyConverter, valueConverter, headerConverter,
                 transformationChain, pluginLoader, time,
                 RetryWithToleranceOperator.NOOP_OPERATOR);
@@ -1463,5 +1464,4 @@ private void assertMetrics(int minimumPollCountExpected) {
 
     private abstract static class TestSinkTask extends SinkTask  {
     }
-
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 61d8778d11f..800301e05dc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -137,7 +138,7 @@ public void setup() {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, 
metrics, keyConverter,
+                taskId, sinkTask, statusListener, initialState, workerConfig, 
ClusterConfigState.EMPTY, metrics, keyConverter,
                 valueConverter, headerConverter,
                 new TransformationChain(Collections.emptyList(), 
RetryWithToleranceOperator.NOOP_OPERATOR),
                 pluginLoader, time, RetryWithToleranceOperator.NOOP_OPERATOR);
@@ -700,5 +701,4 @@ public Object answer() throws Throwable {
 
     private static abstract class TestSinkTask extends SinkTask {
     }
-
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 77f4ad93bad..1482d75b513 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -26,6 +26,7 @@
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import 
org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -104,6 +105,7 @@
     @Mock private KafkaProducer<byte[], byte[]> producer;
     @Mock private OffsetStorageReader offsetReader;
     @Mock private OffsetStorageWriter offsetWriter;
+    @Mock private ClusterConfigState clusterConfigState;
     private WorkerSourceTask workerTask;
     @Mock private Future<RecordMetadata> sendFuture;
     @MockStrict private TaskStatus.Listener statusListener;
@@ -148,7 +150,7 @@ private void createWorkerTask() {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverter, valueConverter, headerConverter,
-                transformationChain, producer, offsetReader, offsetWriter, 
config, metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                transformationChain, producer, offsetReader, offsetWriter, 
config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
                 RetryWithToleranceOperator.NOOP_OPERATOR);
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index d29eef5ed69..6fa7ed11b9e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -34,6 +34,7 @@
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -491,6 +492,7 @@ public void testAddRemoveTask() throws Exception {
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 EasyMock.eq(config),
+                anyObject(ClusterConfigState.class),
                 anyObject(ConnectMetrics.class),
                 anyObject(ClassLoader.class),
                 anyObject(Time.class),
@@ -547,7 +549,7 @@ public void testAddRemoveTask() throws Exception {
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 0, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, 
taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, ClusterConfigState.EMPTY, 
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
         assertStartupStatistics(worker, 0, 0, 1, 0);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
@@ -598,7 +600,7 @@ public void testStartTaskFailure() throws Exception {
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 0, 0, 0, 0);
 
-        assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), 
origProps, taskStatusListener, TargetState.STARTED));
+        assertFalse(worker.startTask(TASK_ID, ClusterConfigState.EMPTY, 
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
         assertStartupStatistics(worker, 0, 0, 1, 1);
 
         assertStatistics(worker, 0, 0);
@@ -629,6 +631,7 @@ public void testCleanupTasksOnStop() throws Exception {
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 anyObject(WorkerConfig.class),
+                anyObject(ClusterConfigState.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
                 anyObject(Time.class),
@@ -688,7 +691,7 @@ public void testCleanupTasksOnStop() throws Exception {
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore);
         worker.start();
         assertStatistics(worker, 0, 0);
-        worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, 
taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, ClusterConfigState.EMPTY, 
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
         worker.stop();
         assertStatistics(worker, 0, 0);
@@ -721,6 +724,7 @@ public void testConverterOverrides() throws Exception {
                 anyObject(OffsetStorageReader.class),
                 anyObject(OffsetStorageWriter.class),
                 anyObject(WorkerConfig.class),
+                anyObject(ClusterConfigState.class),
                 anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
                 anyObject(Time.class),
@@ -784,7 +788,7 @@ public void testConverterOverrides() throws Exception {
         connProps.put("key.converter.extra.config", "foo");
         connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, 
TestConfigurableConverter.class.getName());
         connProps.put("value.converter.extra.config", "bar");
-        worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, 
TargetState.STARTED);
+        worker.startTask(TASK_ID, ClusterConfigState.EMPTY, connProps, 
origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d7a7d87cb4f..911afe7ec2f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -214,7 +214,7 @@ public void testJoinAssignment() throws Exception {
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
conn1SinkConfig)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK1), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -241,7 +241,7 @@ public void testRebalance() throws Exception {
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
conn1SinkConfig)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK1), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -288,7 +288,7 @@ public void testRebalanceFailedConnector() throws Exception 
{
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
conn1SinkConfig)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK1), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -756,7 +756,7 @@ public void testRestartTask() throws Exception {
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
 
@@ -770,7 +770,7 @@ public void testRestartTask() throws Exception {
 
         worker.stopAndAwaitTask(TASK0);
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
 
@@ -820,10 +820,10 @@ public void testRestartUnknownTask() throws Exception {
 
     @Test
     public void testRequestProcessingOrder() throws Exception {
-        final DistributedHerder.HerderRequest req1 = herder.addRequest(100, 
null, null);
-        final DistributedHerder.HerderRequest req2 = herder.addRequest(10, 
null, null);
-        final DistributedHerder.HerderRequest req3 = herder.addRequest(200, 
null, null);
-        final DistributedHerder.HerderRequest req4 = herder.addRequest(200, 
null, null);
+        final DistributedHerder.DistributedHerderRequest req1 = 
herder.addRequest(100, null, null);
+        final DistributedHerder.DistributedHerderRequest req2 = 
herder.addRequest(10, null, null);
+        final DistributedHerder.DistributedHerderRequest req3 = 
herder.addRequest(200, null, null);
+        final DistributedHerder.DistributedHerderRequest req4 = 
herder.addRequest(200, null, null);
 
         assertEquals(req2, herder.requests.pollFirst()); // lowest delay
         assertEquals(req1, herder.requests.pollFirst()); // next lowest delay
@@ -1080,7 +1080,7 @@ public void testUnknownConnectorPaused() throws Exception 
{
         // join
         expectRebalance(1, Collections.<String>emptyList(), 
singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -1117,7 +1117,7 @@ public void testConnectorPausedRunningTaskOnly() throws 
Exception {
         // join
         expectRebalance(1, Collections.<String>emptyList(), 
singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -1157,7 +1157,7 @@ public void testConnectorResumedRunningTaskOnly() throws 
Exception {
         // join
         expectRebalance(1, Collections.<String>emptyList(), 
singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -1210,7 +1210,7 @@ public void testTaskConfigAdded() {
         expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, 
Collections.<String>emptyList(),
                 Arrays.asList(TASK0));
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK0), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         member.poll(EasyMock.anyInt());
@@ -1250,7 +1250,7 @@ public void testJoinLeaderCatchUpFails() throws Exception 
{
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
conn1SinkConfig)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
+        worker.startTask(EasyMock.eq(TASK1), 
EasyMock.<ClusterConfigState>anyObject(), EasyMock.<Map<String, 
String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index fd330f28009..5372a3a27a5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -37,6 +37,7 @@
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConnector;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -68,6 +69,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -325,6 +327,7 @@ public void testRestartConnectorFailureOnStart() throws 
Exception {
         PowerMock.verifyAll();
     }
 
+
     @Test
     public void testRestartTask() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
@@ -337,7 +340,14 @@ public void testRestartTask() throws Exception {
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
 
-        worker.startTask(taskId, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+        ClusterConfigState configState = new ClusterConfigState(
+                -1,
+                Collections.singletonMap(CONNECTOR_NAME, 1),
+                Collections.singletonMap(CONNECTOR_NAME, connectorConfig),
+                Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
+                Collections.singletonMap(taskId, 
taskConfig(SourceSink.SOURCE)),
+                new HashSet<>());
+        worker.startTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
 
         PowerMock.replayAll();
@@ -363,7 +373,14 @@ public void testRestartTaskFailureOnStart() throws 
Exception {
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
 
-        worker.startTask(taskId, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
+        ClusterConfigState configState = new ClusterConfigState(
+                -1,
+                Collections.singletonMap(CONNECTOR_NAME, 1),
+                Collections.singletonMap(CONNECTOR_NAME, connectorConfig),
+                Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
+                Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 
0), taskConfig(SourceSink.SOURCE)),
+                new HashSet<>());
+        worker.startTask(taskId, configState, connectorConfig, 
taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(false);
 
         PowerMock.replayAll();
@@ -597,7 +614,14 @@ private void expectAdd(SourceSink sourceSink) throws 
Exception {
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, 
connConfig))
             .andReturn(singletonList(generatedTaskProps));
 
-        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), 
connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
+        ClusterConfigState configState = new ClusterConfigState(
+                -1,
+                Collections.singletonMap(CONNECTOR_NAME, 1),
+                Collections.singletonMap(CONNECTOR_NAME, 
connectorConfig(sourceSink)),
+                Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
+                Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 
0), generatedTaskProps),
+                new HashSet<>());
+        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, 
connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
         EasyMock.expectLastCall().andReturn(true);
 
         
EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index aac1b78c918..ed62d9b6f01 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -146,7 +146,7 @@
 
     @Before
     public void setUp() {
-        configStorage = 
PowerMock.createPartialMock(KafkaConfigBackingStore.class, new 
String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage = 
PowerMock.createPartialMock(KafkaConfigBackingStore.class, new 
String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG, null);
         Whitebox.setInternalState(configStorage, "configLog", storeLog);
         configStorage.setUpdateListener(configUpdateListener);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Externalize Secrets for Kafka Connect Configurations
> ----------------------------------------------------
>
>                 Key: KAFKA-6886
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6886
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Robert Yokota
>            Assignee: Robert Yokota
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Kafka Connect's connector configurations have plaintext passwords, and 
> Connect stores these in cleartext either on the filesystem (for standalone 
> mode) or in internal topics (for distributed mode). 
> Connect should not store or transmit cleartext passwords in connector 
> configurations. Secrets in stored connector configurations should be allowed 
> to be replaced with references to values stored in external secret management 
> systems. Connect should provide an extension point for adding customized 
> integrations, as well as provide a file-based extension as an example. 
> Second, a Connect runtime should be allowed to be configured to use one or 
> more of these extensions, and allow connector configurations to use 
> placeholders that will be resolved by the runtime before passing the complete 
> connector configurations to connectors. This will allow existing connectors 
> to not see any difference in the configurations that Connect provides to them 
> at startup. And third, Connect's API should be changed to allow a connector 
> to obtain the latest connector configuration at any time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to