This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 774d778  MINOR: Add support for Standalone Connect configs in Rest 
Server extensions (#6622)
774d778 is described below

commit 774d77824a27c11672032b93801b789327cdc769
Author: Cyrus Vafadari <cyr...@alum.mit.edu>
AuthorDate: Mon Apr 22 20:26:42 2019 -0700

    MINOR: Add support for Standalone Connect configs in Rest Server extensions 
(#6622)
    
    Add support for Standalone Connect configs in Rest Server extensions
    
    A bug was introduced in 7a42750d that was caught in system tests:
    The rest extensions fail if a Standalone worker config is passed,
    since it does not have a definition for rebalance timeout.
    A new method was introduced on WorkerConfig that by default returns
    null for the rebalance timeout, and DistributedConfig overloads this
    to return the configured value.
    
    Author: Cyrus Vafadari <cy...@confluent.io>
    Reviewers: Arjun Satish <arjunconfluent.io>, Randall Hauch 
<rha...@gmail.com>
---
 .../apache/kafka/connect/runtime/WorkerConfig.java |  4 +++
 .../runtime/distributed/DistributedConfig.java     |  5 ++++
 .../kafka/connect/runtime/rest/RestServer.java     |  7 +++--
 .../kafka/connect/runtime/rest/RestServerTest.java | 32 ++++++++++++++++++++++
 4 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 583953d..c703bfc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -329,6 +329,10 @@ public class WorkerConfig extends AbstractConfig {
         }
     }
 
+    public Integer getRebalanceTimeout() {
+        return null;
+    }
+
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, 
Object> parsedValues) {
         return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, 
parsedValues);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index dc9017b..af112a5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -268,6 +268,11 @@ public class DistributedConfig extends WorkerConfig {
                         STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC);
     }
 
+    @Override
+    public Integer getRebalanceTimeout() {
+        return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
+    }
+
     public DistributedConfig(Map<String, String> props) {
         super(CONFIG, props);
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index dd643e7..744f35d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -23,7 +23,6 @@ import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
@@ -309,7 +308,9 @@ public class RestServer {
             config, ConnectRestExtension.class);
 
         long herderRequestTimeoutMs = ConnectorsResource.REQUEST_TIMEOUT_MS;
-        Integer rebalanceTimeoutMs = 
config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
+
+        Integer rebalanceTimeoutMs = config.getRebalanceTimeout();
+
         if (rebalanceTimeoutMs != null) {
             herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, 
rebalanceTimeoutMs.longValue());
         }
@@ -332,4 +333,4 @@ public class RestServer {
             return base + path;
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 8824d77..743f92b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -248,4 +249,35 @@ public class RestServerTest {
         }
         PowerMock.verifyAll();
     }
+
+    @Test
+    public void testStandaloneConfig() throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+
+
+      EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+      EasyMock.expect(plugins.newPlugins(Collections.EMPTY_LIST,
+          workerConfig,
+          ConnectRestExtension.class)).andStubReturn(Collections.EMPTY_LIST);
+
+      final Capture<Callback<Collection<String>>> connectorsCallback = 
EasyMock.newCapture();
+      herder.connectors(EasyMock.capture(connectorsCallback));
+      PowerMock.expectLastCall().andAnswer(() -> {
+        connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", 
"b"));
+        return null;
+      });
+
+      PowerMock.replayAll();
+
+      server = new RestServer(workerConfig);
+      server.start(new HerderProvider(herder), herder.plugins());
+      HttpRequest request = new HttpGet("/connectors");
+      CloseableHttpClient httpClient = HttpClients.createMinimal();
+      HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), 
server.advertisedUrl().getPort());
+      CloseableHttpResponse response = httpClient.execute(httpHost, request);
+
+      Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+    }
 }

Reply via email to