C0urante commented on a change in pull request #8069: URL: https://github.com/apache/kafka/pull/8069#discussion_r419583360
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ########## @@ -191,32 +192,71 @@ public synchronized void putConnectorConfig(String connName, boolean allowReplace, final Callback<Created<ConnectorInfo>> callback) { try { - if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) { + validateConnectorConfig(config, new Callback<ConfigInfos>() { + @Override + public void onCompletion(Throwable error, ConfigInfos configInfos) { + if (error != null) { + callback.onCompletion(error, null); + return; + } + + requestExecutorService.submit( + () -> putConnectorConfig(connName, config, allowReplace, callback, configInfos) + ); + } + }); + } catch (Throwable t) { + callback.onCompletion(t, null); + } + } + + private synchronized void putConnectorConfig(String connName, + final Map<String, String> config, + boolean allowReplace, + final Callback<Created<ConnectorInfo>> callback, + ConfigInfos configInfos) { + try { + if (maybeAddConfigErrors(configInfos, callback)) { return; } - boolean created = false; + final boolean created; if (configState.contains(connName)) { if (!allowReplace) { callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null); return; } - worker.stopConnector(connName); + worker.stopAndAwaitConnector(connName); + created = false; } else { created = true; } configBackingStore.putConnectorConfig(connName, config); - if (!startConnector(connName)) { - callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null); - return; - } + Callback<TargetState> onStart = new Callback<TargetState>() { + @Override + public void onCompletion(Throwable error, TargetState result) { + if (error != null) { + callback.onCompletion(error, null); + return; + } + + requestExecutorService.submit(new Runnable() { Review comment: 👍 Ack, will change ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org