This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new 774d778 MINOR: Add support for Standalone Connect configs in Rest Server extensions (#6622) 774d778 is described below commit 774d77824a27c11672032b93801b789327cdc769 Author: Cyrus Vafadari <cyr...@alum.mit.edu> AuthorDate: Mon Apr 22 20:26:42 2019 -0700 MINOR: Add support for Standalone Connect configs in Rest Server extensions (#6622) Add support for Standalone Connect configs in Rest Server extensions A bug was introduced in 7a42750d that was caught in system tests: The rest extensions fail if a Standalone worker config is passed, since it does not have a definition for rebalance timeout. A new method was introduced on WorkerConfig that by default returns null for the rebalance timeout, and DistributedConfig overloads this to return the configured value. Author: Cyrus Vafadari <cy...@confluent.io> Reviewers: Arjun Satish <arjunconfluent.io>, Randall Hauch <rha...@gmail.com> --- .../apache/kafka/connect/runtime/WorkerConfig.java | 4 +++ .../runtime/distributed/DistributedConfig.java | 5 ++++ .../kafka/connect/runtime/rest/RestServer.java | 7 +++-- .../kafka/connect/runtime/rest/RestServerTest.java | 32 ++++++++++++++++++++++ 4 files changed, 45 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 583953d..c703bfc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -329,6 +329,10 @@ public class WorkerConfig extends AbstractConfig { } } + public Integer getRebalanceTimeout() { + return null; + } + @Override protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) { return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index dc9017b..af112a5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -268,6 +268,11 @@ public class DistributedConfig extends WorkerConfig { STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC); } + @Override + public Integer getRebalanceTimeout() { + return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG); + } + public DistributedConfig(Map<String, String> props) { super(CONFIG, props); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index dd643e7..744f35d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -23,7 +23,6 @@ import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; -import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; @@ -309,7 +308,9 @@ public class RestServer { config, ConnectRestExtension.class); long herderRequestTimeoutMs = ConnectorsResource.REQUEST_TIMEOUT_MS; - Integer rebalanceTimeoutMs = config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG); + + Integer rebalanceTimeoutMs = config.getRebalanceTimeout(); + if (rebalanceTimeoutMs != null) { herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue()); } @@ -332,4 +333,4 @@ public class RestServer { return base + path; } -} \ No newline at end of file +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 8824d77..743f92b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.HerderProvider; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.util.Callback; import org.easymock.Capture; import org.easymock.EasyMock; @@ -248,4 +249,35 @@ public class RestServerTest { } PowerMock.verifyAll(); } + + @Test + public void testStandaloneConfig() throws IOException { + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put("offset.storage.file.filename", "/tmp"); + WorkerConfig workerConfig = new StandaloneConfig(workerProps); + + + EasyMock.expect(herder.plugins()).andStubReturn(plugins); + EasyMock.expect(plugins.newPlugins(Collections.EMPTY_LIST, + workerConfig, + ConnectRestExtension.class)).andStubReturn(Collections.EMPTY_LIST); + + final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture(); + herder.connectors(EasyMock.capture(connectorsCallback)); + PowerMock.expectLastCall().andAnswer(() -> { + connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b")); + return null; + }); + + PowerMock.replayAll(); + + server = new RestServer(workerConfig); + server.start(new HerderProvider(herder), herder.plugins()); + HttpRequest request = new HttpGet("/connectors"); + CloseableHttpClient httpClient = HttpClients.createMinimal(); + HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort()); + CloseableHttpResponse response = httpClient.execute(httpHost, request); + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + } }