Repository: kafka Updated Branches: refs/heads/trunk 800d29648 -> 040fde8ec
KAFKA-4878: Improved Invalid Connect Config Error Message Addresses for https://issues.apache.org/jira/browse/KAFKA-4878 * Adjusted the error message to explicitly state errors and their number * Dried up the logic for generating the message between standalone and distributed Example messed up two config keys in the file source config: ```` namse=local-file-source connector.class=FileStreamSource tasks.max=1 fisle=test.txt topic=connect-test ``` Produces: ``` [2017-03-22 08:57:11,896] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99) java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s): Missing required configuration "file" which has no default value. Missing required configuration "name" which has no default value. You can also find the above list of errors at the endpoint `/{connectorType}/config/validate` ``` Author: Armin Braun <[email protected]> Reviewers: Gwen Shapira, Konstantine Karantasis, Ewen Cheslack-Postava Closes #2722 from original-brownbear/KAFKA-4878 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/040fde8e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/040fde8e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/040fde8e Branch: refs/heads/trunk Commit: 040fde8ec1f94856799c6bb47fa23a73c20bda66 Parents: 800d296 Author: Armin Braun <[email protected]> Authored: Mon Apr 3 13:09:10 2017 -0700 Committer: Gwen Shapira <[email protected]> Committed: Mon Apr 3 13:09:10 2017 -0700 ---------------------------------------------------------------------- .../kafka/connect/runtime/AbstractHerder.java | 36 +++++++++++++++++ .../runtime/distributed/DistributedHerder.java | 7 +--- .../runtime/standalone/StandaloneHerder.java | 8 +--- .../standalone/StandaloneHerderTest.java | 41 ++++++++++++++++++++ 4 files changed, 80 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 6a16185..9e5342e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -27,11 +27,13 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import java.io.ByteArrayOutputStream; @@ -338,6 +340,40 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con } } + /** + * Checks a given {@link ConfigInfos} for validation error messages and adds an exception + * to the given {@link Callback} if any were found. + * + * @param configInfos configInfos to read Errors from + * @param callback callback to add config error exception to + * @return true if errors were found in the config + */ + protected final boolean maybeAddConfigErrors( + ConfigInfos configInfos, + Callback<Created<ConnectorInfo>> callback + ) { + int errors = configInfos.errorCount(); + boolean hasErrors = errors > 0; + if (hasErrors) { + StringBuilder messages = new StringBuilder(); + messages.append("Connector configuration is invalid and contains the following ") + .append(errors).append(" error(s):"); + for (ConfigInfo configInfo : configInfos.values()) { + for (String msg : configInfo.configValue().errors()) { + messages.append('\n').append(msg); + } + } + callback.onCompletion( + new BadRequestException( + messages.append( + "\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`" + ).toString() + ), null + ); + } + return hasErrors; + } + private String trace(Throwable t) { ByteArrayOutputStream output = new ByteArrayOutputStream(); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index a76fd03..cf30aca 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -35,10 +35,8 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.rest.RestServer; -import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; -import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; @@ -507,10 +505,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { new Callable<Void>() { @Override public Void call() throws Exception { - ConfigInfos validatedConfig = validateConnectorConfig(config); - if (validatedConfig.errorCount() > 0) { - callback.onCompletion(new BadRequestException("Connector configuration is invalid " + - "(use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"), null); + if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) { return null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index ca0130e..9c8c7ae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -27,10 +27,8 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; -import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; -import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.MemoryConfigBackingStore; import org.apache.kafka.connect.storage.MemoryStatusBackingStore; @@ -45,6 +43,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; + /** * Single process, in-memory "herder". Useful for a standalone Kafka Connect process. */ @@ -155,10 +154,7 @@ public class StandaloneHerder extends AbstractHerder { boolean allowReplace, final Callback<Created<ConnectorInfo>> callback) { try { - ConfigInfos validatedConfig = validateConnectorConfig(config); - if (validatedConfig.errorCount() > 0) { - callback.onCompletion(new BadRequestException("Connector configuration is invalid " + - "(use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"), null); + if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) { return; } http://git-wip-us.apache.org/repos/asf/kafka/blob/040fde8e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 04d14b5..da1edbc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -56,6 +56,7 @@ import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.annotation.Mock; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -484,6 +485,46 @@ public class StandaloneHerderTest { PowerMock.verifyAll(); } + @Test + public void testCorruptConfig() { + Map<String, String> config = new HashMap<>(); + config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); + config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName()); + Connector connectorMock = PowerMock.createMock(Connector.class); + String error = "This is an error in your config!"; + List<String> errors = new ArrayList<>(singletonList(error)); + String key = "foo.invalid.key"; + EasyMock.expect(connectorMock.validate(config)).andReturn( + new Config( + Arrays.asList(new ConfigValue(key, null, Collections.emptyList(), errors)) + ) + ); + ConfigDef configDef = new ConfigDef(); + configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); + EasyMock.expect(connectorMock.config()).andStubReturn(configDef); + ConnectorFactory connectorFactoryMock = PowerMock.createMock(ConnectorFactory.class); + EasyMock.expect(worker.getConnectorFactory()).andStubReturn(connectorFactoryMock); + EasyMock.expect(connectorFactoryMock.newConnector(EasyMock.anyString())) + .andReturn(connectorMock); + Callback<Herder.Created<ConnectorInfo>> callback = PowerMock.createMock(Callback.class); + Capture<BadRequestException> capture = Capture.newInstance(); + callback.onCompletion( + EasyMock.capture(capture), EasyMock.isNull(Herder.Created.class) + ); + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, config, true, callback); + assertEquals( + capture.getValue().getMessage(), + "Connector configuration is invalid and contains the following 1 error(s):\n" + + error + "\n" + + "You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`" + ); + + PowerMock.verifyAll(); + } + private void expectAdd(SourceSink sourceSink) throws Exception { Map<String, String> connectorProps = connectorConfig(sourceSink);
