[ 
https://issues.apache.org/jira/browse/KAFKA-7242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595525#comment-16595525
 ] 

ASF GitHub Bot commented on KAFKA-7242:
---------------------------------------

ewencp closed pull request #5475: KAFKA-7242: Reverse xform configs before 
saving
URL: https://github.com/apache/kafka/pull/5475
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index f5a3737d334..6430ffdd419 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -53,7 +53,7 @@
  * {@link ConfigProvider#unsubscribe(String, Set, ConfigChangeCallback)} 
methods.
  */
 public class ConfigTransformer {
-    private static final Pattern DEFAULT_PATTERN = 
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
+    public static final Pattern DEFAULT_PATTERN = 
Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
     private static final String EMPTY_PATH = "";
 
     private final Map<String, ConfigProvider> configProviders;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index cadb4e05d9a..82fdeccc96b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -20,9 +20,11 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.ConfigKey;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -46,6 +48,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -53,6 +56,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Abstract Herder implementation which handles connector/task lifecycle 
tracking. Extensions
@@ -431,4 +436,42 @@ private String trace(Throwable t) {
             return null;
         }
     }
+
+    /*
+     * Performs a reverse transformation on a set of task configs, by 
replacing values with variable references.
+     */
+    public static List<Map<String, String>> reverseTransform(String connName,
+                                                             
ClusterConfigState configState,
+                                                             List<Map<String, 
String>> configs) {
+
+        // Find the config keys in the raw connector config that have variable 
references
+        Map<String, String> rawConnConfig = 
configState.rawConnectorConfig(connName);
+        Set<String> connKeysWithVariableValues = 
keysWithVariableValues(rawConnConfig, ConfigTransformer.DEFAULT_PATTERN);
+
+        List<Map<String, String>> result = new ArrayList<>();
+        for (Map<String, String> config : configs) {
+            Map<String, String> newConfig = new HashMap<>(config);
+            for (String key : connKeysWithVariableValues) {
+                if (newConfig.containsKey(key)) {
+                    newConfig.put(key, rawConnConfig.get(key));
+                }
+            }
+            result.add(newConfig);
+        }
+        return result;
+    }
+
+    private static Set<String> keysWithVariableValues(Map<String, String> 
rawConfig, Pattern pattern) {
+        Set<String> keys = new HashSet<>();
+        for (Map.Entry<String, String> config : rawConfig.entrySet()) {
+            if (config.getValue() != null) {
+                Matcher matcher = pattern.matcher(config.getValue());
+                if (matcher.matches()) {
+                    keys.add(config.getKey());
+                }
+            }
+        }
+        return keys;
+    }
+
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
index 11693b51795..fc6a50d2fc0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
@@ -123,6 +123,10 @@ public boolean contains(String connector) {
         return configs;
     }
 
+    public Map<String, String> rawConnectorConfig(String connector) {
+        return connectorConfigs.get(connector);
+    }
+
     /**
      * Get the target state of the connector
      * @param connector name of the connector
@@ -148,16 +152,28 @@ public TargetState targetState(String connector) {
         return configs;
     }
 
+    public Map<String, String> rawTaskConfig(ConnectorTaskId task) {
+        return taskConfigs.get(task);
+    }
+
     /**
-     * Get all task configs for a connector.
+     * Get all task configs for a connector.  The configurations will have 
been transformed by
+     * {@link org.apache.kafka.common.config.ConfigTransformer} by having all 
variable
+     * references replaced with the current values from external instances of
+     * {@link ConfigProvider}, and may include secrets.
      * @param connector name of the connector
      * @return a list of task configurations
      */
     public List<Map<String, String>> allTaskConfigs(String connector) {
         Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>();
         for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : 
this.taskConfigs.entrySet()) {
-            if (taskConfigEntry.getKey().connector().equals(connector))
-                taskConfigs.put(taskConfigEntry.getKey().task(), 
taskConfigEntry.getValue());
+            if (taskConfigEntry.getKey().connector().equals(connector)) {
+                Map<String, String> configs = taskConfigEntry.getValue();
+                if (configTransformer != null) {
+                    configs = configTransformer.transform(connector, configs);
+                }
+                taskConfigs.put(taskConfigEntry.getKey().task(), configs);
+            }
         }
         return new LinkedList<>(taskConfigs.values());
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 5efb78a93e4..f2009dbac1e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1020,8 +1020,9 @@ private void reconfigureConnector(final String connName, 
final Callback<Void> cb
                 }
             }
             if (changed) {
+                List<Map<String, String>> rawTaskProps = 
reverseTransform(connName, configState, taskProps);
                 if (isLeader()) {
-                    configBackingStore.putTaskConfigs(connName, taskProps);
+                    configBackingStore.putTaskConfigs(connName, rawTaskProps);
                     cb.onCompletion(null, null);
                 } else {
                     // We cannot forward the request on the same thread 
because this reconfiguration can happen as a result of connector
@@ -1031,7 +1032,7 @@ private void reconfigureConnector(final String connName, 
final Callback<Void> cb
                         public void run() {
                             try {
                                 String reconfigUrl = 
RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
-                                RestClient.httpRequest(reconfigUrl, "POST", 
taskProps, null, config);
+                                RestClient.httpRequest(reconfigUrl, "POST", 
rawTaskProps, null, config);
                                 cb.onCompletion(null, null);
                             } catch (ConnectException e) {
                                 log.error("Request to leader to reconfigure 
connector tasks failed", e);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 20c6a24d384..40ad9803a2c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -201,7 +201,9 @@ public synchronized void putConnectorConfig(String connName,
                 created = true;
             }
 
-            if (!startConnector(config)) {
+            configBackingStore.putConnectorConfig(connName, config);
+
+            if (!startConnector(connName)) {
                 callback.onCompletion(new ConnectException("Failed to start 
connector: " + connName), null);
                 return;
             }
@@ -270,9 +272,8 @@ public synchronized void restartConnector(String connName, 
Callback<Void> cb) {
         if (!configState.contains(connName))
             cb.onCompletion(new NotFoundException("Connector " + connName + " 
not found", null), null);
 
-        Map<String, String> config = configState.connectorConfig(connName);
         worker.stopConnector(connName);
-        if (startConnector(config))
+        if (startConnector(connName))
             cb.onCompletion(null, null);
         else
             cb.onCompletion(new ConnectException("Failed to start connector: " 
+ connName), null);
@@ -290,9 +291,7 @@ public void run() {
         return new StandaloneHerderRequest(requestSeqNum.incrementAndGet(), 
future);
     }
 
-    private boolean startConnector(Map<String, String> connectorProps) {
-        String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
-        configBackingStore.putConnectorConfig(connName, connectorProps);
+    private boolean startConnector(String connName) {
         Map<String, String> connConfigs = 
configState.connectorConfig(connName);
         TargetState targetState = configState.targetState(connName);
         return worker.startConnector(connName, connConfigs, new 
HerderConnectorContext(this, connName), this, targetState);
@@ -336,7 +335,8 @@ private void updateConnectorTasks(String connName) {
 
         if (!newTaskConfigs.equals(oldTaskConfigs)) {
             removeConnectorTasks(connName);
-            configBackingStore.putTaskConfigs(connName, newTaskConfigs);
+            List<Map<String, String>> rawTaskConfigs = 
reverseTransform(connName, configState, newTaskConfigs);
+            configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
             createConnectorTasks(connName, configState.targetState(connName));
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index db3cf273fe7..8dbda185401 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -20,12 +20,15 @@
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -40,6 +43,7 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -61,6 +65,53 @@
 @PrepareForTest({AbstractHerder.class})
 public class AbstractHerderTest {
 
+    private static final String CONN1 = "sourceA";
+    private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
+    private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
+    private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
+    private static final Integer MAX_TASKS = 3;
+    private static final Map<String, String> CONN1_CONFIG = new HashMap<>();
+    private static final String TEST_KEY = "testKey";
+    private static final String TEST_KEY2 = "testKey2";
+    private static final String TEST_KEY3 = "testKey3";
+    private static final String TEST_VAL = "testVal";
+    private static final String TEST_VAL2 = "testVal2";
+    private static final String TEST_REF = "${file:/tmp/somefile.txt:somevar}";
+    private static final String TEST_REF2 = 
"${file:/tmp/somefile2.txt:somevar2}";
+    private static final String TEST_REF3 = 
"${file:/tmp/somefile3.txt:somevar3}";
+    static {
+        CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
+        CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, 
MAX_TASKS.toString());
+        CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSourceConnector.class.getName());
+        CONN1_CONFIG.put(TEST_KEY, TEST_REF);
+        CONN1_CONFIG.put(TEST_KEY2, TEST_REF2);
+        CONN1_CONFIG.put(TEST_KEY3, TEST_REF3);
+    }
+    private static final Map<String, String> TASK_CONFIG = new HashMap<>();
+    static {
+        TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, 
BogusSourceTask.class.getName());
+        TASK_CONFIG.put(TEST_KEY, TEST_REF);
+    }
+    private static final List<Map<String, String>> TASK_CONFIGS = new 
ArrayList<>();
+    static {
+        TASK_CONFIGS.add(TASK_CONFIG);
+        TASK_CONFIGS.add(TASK_CONFIG);
+        TASK_CONFIGS.add(TASK_CONFIG);
+    }
+    private static final HashMap<ConnectorTaskId, Map<String, String>> 
TASK_CONFIGS_MAP = new HashMap<>();
+    static {
+        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
+        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
+        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
+    }
+    private static final ClusterConfigState SNAPSHOT = new 
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
+    private static final ClusterConfigState SNAPSHOT_NO_TASKS = new 
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
+            Collections.emptyMap(), Collections.<String>emptySet());
+
     private final String workerId = "workerId";
     private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
     private final int generation = 5;
@@ -248,6 +299,29 @@ public void testConfigValidationTransformsExtendResults() {
         verifyAll();
     }
 
+    @Test
+    public void testReverseTransformConfigs() throws Exception {
+        // Construct a task config with constant values for TEST_KEY and 
TEST_KEY2
+        Map<String, String> newTaskConfig = new HashMap<>();
+        newTaskConfig.put(TaskConfig.TASK_CLASS_CONFIG, 
BogusSourceTask.class.getName());
+        newTaskConfig.put(TEST_KEY, TEST_VAL);
+        newTaskConfig.put(TEST_KEY2, TEST_VAL2);
+        List<Map<String, String>> newTaskConfigs = new ArrayList<>();
+        newTaskConfigs.add(newTaskConfig);
+
+        // The SNAPSHOT has a task config with TEST_KEY and TEST_REF
+        List<Map<String, String>> reverseTransformed = 
AbstractHerder.reverseTransform(CONN1, SNAPSHOT, newTaskConfigs);
+        assertEquals(TEST_REF, reverseTransformed.get(0).get(TEST_KEY));
+
+        // The SNAPSHOT has no task configs but does have a connector config 
with TEST_KEY2 and TEST_REF2
+        reverseTransformed = AbstractHerder.reverseTransform(CONN1, 
SNAPSHOT_NO_TASKS, newTaskConfigs);
+        assertEquals(TEST_REF2, reverseTransformed.get(0).get(TEST_KEY2));
+
+        // The reverseTransformed result should not have TEST_KEY3 since 
newTaskConfigs does not have TEST_KEY3
+        reverseTransformed = AbstractHerder.reverseTransform(CONN1, 
SNAPSHOT_NO_TASKS, newTaskConfigs);
+        assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3));
+    }
+
     private AbstractHerder createConfigValidationHerder(Class<? extends 
Connector> connectorClass) {
 
 
@@ -299,4 +373,11 @@ public void close() {
 
         }
     }
+
+    // We need to use a real class here due to some issue with mocking 
java.lang.Class
+    private abstract class BogusSourceConnector extends SourceConnector {
+    }
+
+    private abstract class BogusSourceTask extends SourceTask {
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Externalized secrets are revealed in task configuration
> -------------------------------------------------------
>
>                 Key: KAFKA-7242
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7242
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Bahdan Siamionau
>            Assignee: Robert Yokota
>            Priority: Major
>             Fix For: 2.0.1, 2.1.0
>
>
> Trying to use new [externalized 
> secrets|https://issues.apache.org/jira/browse/KAFKA-6886] feature I noticed 
> that task configuration is being saved in config topic with disclosed 
> secrets. It seems like the main goal of feature was not achieved - secrets 
> are still persisted in plain-text. Probably I'm misusing this new config, 
> please correct me if I wrong.
> I'm running connect in distributed mode, creating connector with following 
> config:
> {code:java}
> {
>   "name" : "jdbc-sink-test",
>   "config" : {
>     "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
>     "tasks.max" : "1",
>     "config.providers" : "file",
>     "config.providers.file.class" : 
> "org.apache.kafka.common.config.provider.FileConfigProvider",
>     "config.providers.file.param.secrets" : "/opt/mysecrets",
>     "topics" : "test_topic",
>     "connection.url" : "${file:/opt/mysecrets:url}",
>     "connection.user" : "${file:/opt/mysecrets:user}",
>     "connection.password" : "${file:/opt/mysecrets:password}",
>     "insert.mode" : "upsert",
>     "pk.mode" : "record_value",
>     "pk.field" : "id"
>   }
> }
> {code}
> Connector works fine, placeholders are substituted with correct values from 
> file, but then updated config is written into  the topic again (see 3 
> following records in config topic):
> {code:java}
> key: connector-jdbc-sink-test
> value:
> {
> "properties": {
> "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
> "tasks.max": "1",
> "config.providers": "file",
> "config.providers.file.class": 
> "org.apache.kafka.common.config.provider.FileConfigProvider",
> "config.providers.file.param.secrets": "/opt/mysecrets",
> "topics": "test_topic",
> "connection.url": "${file:/opt/mysecrets:url}",
> "connection.user": "${file:/opt/mysecrets:user}",
> "connection.password": "${file:/opt/mysecrets:password}",
> "insert.mode": "upsert",
> "pk.mode": "record_value",
> "pk.field": "id",
> "name": "jdbc-sink-test"
> }
> }
> key: task-jdbc-sink-test-0
> value:
> {
> "properties": {
> "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
> "config.providers.file.param.secrets": "/opt/mysecrets",
> "connection.password": "actualpassword",
> "tasks.max": "1",
> "topics": "test_topic",
> "config.providers": "file",
> "pk.field": "id",
> "task.class": "io.confluent.connect.jdbc.sink.JdbcSinkTask",
> "connection.user": "datawarehouse",
> "name": "jdbc-sink-test",
> "config.providers.file.class": 
> "org.apache.kafka.common.config.provider.FileConfigProvider",
> "connection.url": 
> "jdbc:postgresql://actualurl:5432/datawarehouse?stringtype=unspecified",
> "insert.mode": "upsert",
> "pk.mode": "record_value"
> }
> }
> key: commit-jdbc-sink-test
> value:
> {
> "tasks":1
> }
> {code}
> Please advice have I misunderstood the goal of the given feature, have I 
> missed smth in configuration or is it actually a bug? Thank you



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to