[ 
https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701426#comment-16701426
 ] 

ASF GitHub Bot commented on KAFKA-7620:
---------------------------------------

ewencp closed pull request #5914: KAFKA-7620:  Fix restart logic for TTLs in 
WorkerConfigTransformer
URL: https://github.com/apache/kafka/pull/5914
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efcc01d2c45..8889aadbae1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -35,6 +35,7 @@
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static 
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -105,8 +106,8 @@
             "indicates that a configuration value will expire in the future.";
 
     private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
-    public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.toString();
-    public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.toString();
+    public static final String CONFIG_RELOAD_ACTION_NONE = 
Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT);
+    public static final String CONFIG_RELOAD_ACTION_RESTART = 
Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT);
 
     public static final String ERRORS_RETRY_TIMEOUT_CONFIG = 
"errors.retry.timeout";
     public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout 
for Errors";
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5c7cc1429aa..c572e20b52f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -148,12 +148,6 @@
      */
     void restartTask(ConnectorTaskId id, Callback<Void> cb);
 
-    /**
-     * Get the configuration reload action.
-     * @param connName name of the connector
-     */
-    ConfigReloadAction connectorConfigReloadAction(final String connName);
-
     /**
      * Restart the connector.
      * @param connName name of the connector
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1b715c70c76..3373d5ce328 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +34,8 @@
  * retrieved TTL values.
  */
 public class WorkerConfigTransformer {
+    private static final Logger log = 
LoggerFactory.getLogger(WorkerConfigTransformer.class);
+
     private final Worker worker;
     private final ConfigTransformer configTransformer;
     private final ConcurrentMap<String, Map<String, HerderRequest>> requests = 
new ConcurrentHashMap<>();
@@ -46,7 +53,16 @@ public WorkerConfigTransformer(Worker worker, Map<String, 
ConfigProvider> config
         if (configs == null) return null;
         ConfigTransformerResult result = configTransformer.transform(configs);
         if (connectorName != null) {
-            scheduleReload(connectorName, result.ttls());
+            String key = ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+            String action = (String) ConfigDef.parseType(key, 
configs.get(key), ConfigDef.Type.STRING);
+            if (action == null) {
+                // The default action is "restart".
+                action = ConnectorConfig.CONFIG_RELOAD_ACTION_RESTART;
+            }
+            ConfigReloadAction reloadAction = 
ConfigReloadAction.valueOf(action.toUpperCase(Locale.ROOT));
+            if (reloadAction == ConfigReloadAction.RESTART) {
+                scheduleReload(connectorName, result.ttls());
+            }
         }
         return result.data();
     }
@@ -58,21 +74,19 @@ private void scheduleReload(String connectorName, 
Map<String, Long> ttls) {
     }
 
     private void scheduleReload(String connectorName, String path, long ttl) {
-        Herder herder = worker.herder();
-        if (herder.connectorConfigReloadAction(connectorName) == 
Herder.ConfigReloadAction.RESTART) {
-            Map<String, HerderRequest> connectorRequests = 
requests.get(connectorName);
-            if (connectorRequests == null) {
-                connectorRequests = new ConcurrentHashMap<>();
-                requests.put(connectorName, connectorRequests);
-            } else {
-                HerderRequest previousRequest = connectorRequests.get(path);
-                if (previousRequest != null) {
-                    // Delete previous request for ttl which is now stale
-                    previousRequest.cancel();
-                }
+        Map<String, HerderRequest> connectorRequests = 
requests.get(connectorName);
+        if (connectorRequests == null) {
+            connectorRequests = new ConcurrentHashMap<>();
+            requests.put(connectorName, connectorRequests);
+        } else {
+            HerderRequest previousRequest = connectorRequests.get(path);
+            if (previousRequest != null) {
+                // Delete previous request for ttl which is now stale
+                previousRequest.cancel();
             }
-            HerderRequest request = herder.restartConnector(ttl, 
connectorName, null);
-            connectorRequests.put(path, request);
         }
+        log.info("Scheduling a restart of connector {} in {} ms", 
connectorName, ttl);
+        HerderRequest request = worker.herder().restartConnector(ttl, 
connectorName, null);
+        connectorRequests.put(path, request);
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index f2009dbac1e..dc91f356f3d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -61,7 +61,6 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
@@ -642,13 +641,6 @@ else if (!configState.contains(connName))
         );
     }
 
-    @Override
-    public ConfigReloadAction connectorConfigReloadAction(final String 
connName) {
-        return ConfigReloadAction.valueOf(
-                
configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
-                        .toUpperCase(Locale.ROOT));
-    }
-
     @Override
     public void restartConnector(final String connName, final Callback<Void> 
callback) {
         restartConnector(0, connName, callback);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 40ad9803a2c..fe31c284613 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -42,7 +42,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.Executors;
@@ -260,13 +259,6 @@ public synchronized void restartTask(ConnectorTaskId 
taskId, Callback<Void> cb)
             cb.onCompletion(new ConnectException("Failed to start task: " + 
taskId), null);
     }
 
-    @Override
-    public ConfigReloadAction connectorConfigReloadAction(final String 
connName) {
-        return ConfigReloadAction.valueOf(
-                
configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
-                        .toUpperCase(Locale.ROOT));
-    }
-
     @Override
     public synchronized void restartConnector(String connName, Callback<Void> 
cb) {
         if (!configState.contains(connName))
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
index b5da2646c3d..e30acb15ed4 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -28,9 +28,12 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_NONE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.powermock.api.easymock.PowerMock.replayAll;
@@ -69,18 +72,18 @@ public void testReplaceVariable() {
     @Test
     public void testReplaceVariableWithTTL() {
         EasyMock.expect(worker.herder()).andReturn(herder);
-        
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.NONE);
 
         replayAll();
 
-        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
-        assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+        Map<String, String> props = new HashMap<>();
+        props.put(MY_KEY, "${test:testPath:testKeyWithTTL}");
+        props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE);
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, 
props);
     }
 
     @Test
     public void testReplaceVariableWithTTLAndScheduleRestart() {
         EasyMock.expect(worker.herder()).andReturn(herder);
-        
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
         EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, 
null)).andReturn(requestId);
 
         replayAll();
@@ -92,11 +95,9 @@ public void testReplaceVariableWithTTLAndScheduleRestart() {
     @Test
     public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() {
         EasyMock.expect(worker.herder()).andReturn(herder);
-        
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
         EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR, 
null)).andReturn(requestId);
 
         EasyMock.expect(worker.herder()).andReturn(herder);
-        
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
         EasyMock.expectLastCall();
         requestId.cancel();
         EasyMock.expectLastCall();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConfigProvider is broken for KafkaConnect when TTL is not null
> --------------------------------------------------------------
>
>                 Key: KAFKA-7620
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7620
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Ye Ji
>            Assignee: Robert Yokota
>            Priority: Major
>             Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> If the ConfigData returned by ConfigProvider.get implementations has non-null 
> and non-negative ttl, it will trigger infinite recursion, here is an excerpt 
> of the stack trace:
> {code:java}
> at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>       at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>       at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
>       at 
> org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648)
>       at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
>       at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
>       at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
> {code}
> Basically, 
> 1) if a non-null ttl is returned from the config provider, connect runtime 
> will try to schedule a reload in the future, 
> 2) scheduleReload function reads the config again to see if it is a restart 
> or not, by calling 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to 
> transform the config
> 3) the transform function calls config provider, and gets a non-null ttl, 
> causing scheduleReload being called, we are back to step 1.
> To reproduce, simply fork the provided 
> [FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java],
>  and add a non-negative ttl to the ConfigData returned by the get functions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to