C0urante commented on code in PR #14704: URL: https://github.com/apache/kafka/pull/14704#discussion_r1385398359
########## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java: ########## @@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect connect, String[] extraAr } } + /** + * Parse a connector configuration file into a {@link CreateConnectorRequest}. The file can have any one of the following formats: + * <ol> + * <li>A JSON file containing an Object with only String keys and values that represent the connector configuration.</li> + * <li>A JSON file containing an Object that can be parsed directly into a {@link CreateConnectorRequest}</li> + * <li>A valid Java Properties file (i.e. containing String key/value pairs representing the connector configuration)</li> + * </ol> + * <p> + * Visible for testing. + * + * @param filePath the path of the connector configuration file + * @return the parsed connector configuration in the form of a {@link CreateConnectorRequest} + */ + CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + + File connectorConfigurationFile = Paths.get(filePath).toFile(); + try { + Map<String, String> connectorConfigs = objectMapper.readValue( + connectorConfigurationFile, + new TypeReference<Map<String, String>>() { }); + + if (!connectorConfigs.containsKey(NAME_CONFIG)) { + throw new ConnectException("Connector configuration at '" + filePath + "' is missing the mandatory '" + NAME_CONFIG + "' " + + "configuration"); + } + return new CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, null); + } catch (StreamReadException | DatabindException e) { + log.debug("Could not parse connector configuration file '{}' into a Map with String keys and values", filePath); + } + + try { + CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile, + new TypeReference<CreateConnectorRequest>() { }); + if (createConnectorRequest.config().containsKey(NAME_CONFIG)) { + if (!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name())) { + throw new ConnectException("Connector name configuration in 'config' doesn't match the one specified in 'name'"); Review Comment: Can we include the file name in the error message here, like we do in the other two name-related exceptions? ########## docs/connect.html: ########## @@ -293,7 +293,7 @@ <h4><a id="connect_rest" href="#connect_rest">REST API</a></h4> <ul> <li><code>GET /connectors</code> - return a list of active connectors</li> - <li><code>POST /connectors</code> - create a new connector; the request body should be a JSON object containing a string <code>name</code> field and an object <code>config</code> field with the connector configuration parameters</li> + <li><code>POST /connectors</code> - create a new connector; the request body should be a JSON object containing a string <code>name</code> field and an object <code>config</code> field with the connector configuration parameters. The JSON object may also optionally contain a string <code>initial_state</code> field which can take the following values - <code>STOPPED</code>, <code>PAUSED</code> and the default value of <code>RUNNING</code></li> Review Comment: Nit: wording ```suggestion <li><code>POST /connectors</code> - create a new connector; the request body should be a JSON object containing a string <code>name</code> field and an object <code>config</code> field with the connector configuration parameters. The JSON object may also optionally contain a string <code>initial_state</code> field which can take the following values - <code>STOPPED</code>, <code>PAUSED</code> or <code>RUNNING</code>(the default)</li> ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java: ########## @@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect connect, String[] extraAr } } + /** + * Parse a connector configuration file into a {@link CreateConnectorRequest}. The file can have any one of the following formats: + * <ol> + * <li>A JSON file containing an Object with only String keys and values that represent the connector configuration.</li> + * <li>A JSON file containing an Object that can be parsed directly into a {@link CreateConnectorRequest}</li> + * <li>A valid Java Properties file (i.e. containing String key/value pairs representing the connector configuration)</li> Review Comment: Can we clarify that we attempt to parse in this order? ########## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java: ########## @@ -61,23 +71,24 @@ public ConnectStandalone(String... args) { @Override protected String usage() { - return "ConnectStandalone worker.properties [connector1.properties connector2.properties ...]"; + return "ConnectStandalone worker.properties [connector1.properties connector2.json ...]"; } @Override protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) { try { - for (final String connectorPropsFile : extraArgs) { - Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile)); + for (final String connectorConfigFile : extraArgs) { + CreateConnectorRequest createConnectorRequest = parseConnectorConfigurationFile(connectorConfigFile); FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> { if (error != null) - log.error("Failed to create connector for {}", connectorPropsFile); + log.error("Failed to create connector for {}", connectorConfigFile); else log.info("Created connector {}", info.result().name()); }); herder.putConnectorConfig( - connectorProps.get(ConnectorConfig.NAME_CONFIG), - connectorProps, false, cb); + createConnectorRequest.name(), createConnectorRequest.config(), + createConnectorRequest.initialState() != null ? createConnectorRequest.initialState().toTargetState() : null, Review Comment: Nit: we can move this null-safe conversion logic to the `CreateConnectorRequest` class, maybe with a `targetState()` or `initialTargetState()` method. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ########## @@ -145,7 +146,11 @@ public Response createConnector(final @Parameter(hidden = true) @QueryParam("for checkAndPutConnectorConfigName(name, configs); FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); - herder.putConnectorConfig(name, configs, false, cb); + TargetState targetState = null; + if (createRequest.initialState() != null) { + targetState = createRequest.initialState().toTargetState(); + } Review Comment: This is another place that would benefit from a `CreateConnectorRequest::targetState` method. ########## connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.cli; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ConnectStandaloneTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>(); + static { + CONNECTOR_CONFIG.put(NAME_CONFIG, CONNECTOR_NAME); + CONNECTOR_CONFIG.put("key1", "val1"); + CONNECTOR_CONFIG.put("key2", "val2"); + } + + private final ConnectStandalone connectStandalone = new ConnectStandalone(); + private File connectorConfigurationFile; + + @Before + public void setUp() throws IOException { + connectorConfigurationFile = TestUtils.tempFile(); + } + + @Test + public void testParseJavaPropertiesFile() throws Exception { + Properties properties = new Properties(); + CONNECTOR_CONFIG.forEach(properties::setProperty); + + try (FileWriter writer = new FileWriter(connectorConfigurationFile)) { + properties.store(writer, null); + } + + CreateConnectorRequest request = connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath()); + assertEquals(CONNECTOR_NAME, request.name()); + assertEquals(CONNECTOR_CONFIG, request.config()); + assertNull(request.initialState()); + } + + @Test + public void testParseJsonFileWithConnectorConfiguration() throws Exception { + try (FileWriter writer = new FileWriter(connectorConfigurationFile)) { + writer.write(new ObjectMapper().writeValueAsString(CONNECTOR_CONFIG)); + } + + CreateConnectorRequest request = connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath()); + assertEquals(CONNECTOR_NAME, request.name()); + assertEquals(CONNECTOR_CONFIG, request.config()); + assertNull(request.initialState()); + } + + @Test + public void testParseJsonFileWithCreateConnectorRequest() throws Exception { Review Comment: Worth adding a case for this format with no initial state as well? ########## docs/connect.html: ########## @@ -60,7 +60,7 @@ <h4><a id="connect_running" href="#connect_running">Running Kafka Connect</a></h <p>Starting with 2.3.0, client configuration overrides can be configured individually per connector by using the prefixes <code>producer.override.</code> and <code>consumer.override.</code> for Kafka sources or Kafka sinks respectively. These overrides are included with the rest of the connector's configuration properties.</p> - <p>The remaining parameters are connector configuration files. You may include as many as you want, but all will execute within the same process (on different threads). You can also choose not to specify any connector configuration files on the command line, and instead use the REST API to create connectors at runtime after your standalone worker starts.</p> + <p>The remaining parameters are connector configuration files. Each file may either be a Java Properties files or a JSON file containing an object with the same structure as the request body of either the <code>POST /connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code> endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a>). You may include as many as you want, but all will execute within the same process (on different threads). You can also choose not to specify any connector configuration files on the command line, and instead use the REST API to create connectors at runtime after your standalone worker starts.</p> Review Comment: Nit: typo ```suggestion <p>The remaining parameters are connector configuration files. Each file may either be a Java Properties file or a JSON file containing an object with the same structure as the request body of either the <code>POST /connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code> endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a>). You may include as many as you want, but all will execute within the same process (on different threads). You can also choose not to specify any connector configuration files on the command line, and instead use the REST API to create connectors at runtime after your standalone worker starts.</p> ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java: ########## @@ -42,17 +47,47 @@ public Map<String, String> config() { return config; } + @JsonProperty + public InitialState initialState() { + return initialState; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CreateConnectorRequest that = (CreateConnectorRequest) o; return Objects.equals(name, that.name) && - Objects.equals(config, that.config); + Objects.equals(config, that.config) && + Objects.equals(initialState, that.initialState); } @Override public int hashCode() { - return Objects.hash(name, config); + return Objects.hash(name, config, initialState); + } + + public enum InitialState { + RUNNING, + PAUSED, + STOPPED; + + @JsonCreator + public static InitialState forValue(String value) { + return InitialState.valueOf(value.toUpperCase(Locale.ROOT)); + } + + public TargetState toTargetState() { + switch (this) { + case RUNNING: + return TargetState.STARTED; + case PAUSED: + return TargetState.PAUSED; + case STOPPED: + return TargetState.STOPPED; + default: + throw new IllegalArgumentException("Unknown initial state"); Review Comment: Can we include the state here? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ########## @@ -108,6 +108,19 @@ public interface Herder { */ void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Created<ConnectorInfo>> callback); + /** + * Set the configuration for a connector, along with a target state optionally. This supports creation and updating. + * @param connName name of the connector + * @param config the connector's configuration + * @param targetState the desired target state for the connector; may be {@code null} if no target state change is desired. Note that the default + * target state is {@link TargetState#STARTED} if no target state exists previously + * @param allowReplace if true, allow overwriting previous configs; if false, throw {@link AlreadyExistsException} + * if a connector with the same name already exists + * @param callback callback to invoke when the configuration has been written + */ + void putConnectorConfig(String connName, Map<String, String> config, TargetState targetState, boolean allowReplace, Review Comment: Why introduce a new method? My first instinct would've been to add a new parameter to the existing `Herder::putConnectorConfig` method. ########## docs/connect.html: ########## @@ -41,7 +41,7 @@ <h4><a id="connect_running" href="#connect_running">Running Kafka Connect</a></h <p>In standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:</p> <pre class="brush: bash;"> -> bin/connect-standalone.sh config/connect-standalone.properties [connector1.properties connector2.properties ...]</pre> +> bin/connect-standalone.sh config/connect-standalone.properties [connector1.properties connector2.json ...]</pre> Review Comment: Do we also want to update the [usage](https://github.com/apache/kafka/blob/edc7e10a745c350ad1efa9e4866370dc8ea0e034/bin/connect-standalone.sh#L19) for `connect-standalone.sh`? ########## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java: ########## @@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect connect, String[] extraAr } } + /** + * Parse a connector configuration file into a {@link CreateConnectorRequest}. The file can have any one of the following formats: + * <ol> + * <li>A JSON file containing an Object with only String keys and values that represent the connector configuration.</li> + * <li>A JSON file containing an Object that can be parsed directly into a {@link CreateConnectorRequest}</li> + * <li>A valid Java Properties file (i.e. containing String key/value pairs representing the connector configuration)</li> + * </ol> + * <p> + * Visible for testing. + * + * @param filePath the path of the connector configuration file + * @return the parsed connector configuration in the form of a {@link CreateConnectorRequest} + */ + CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + + File connectorConfigurationFile = Paths.get(filePath).toFile(); + try { + Map<String, String> connectorConfigs = objectMapper.readValue( + connectorConfigurationFile, + new TypeReference<Map<String, String>>() { }); + + if (!connectorConfigs.containsKey(NAME_CONFIG)) { + throw new ConnectException("Connector configuration at '" + filePath + "' is missing the mandatory '" + NAME_CONFIG + "' " + + "configuration"); + } + return new CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, null); + } catch (StreamReadException | DatabindException e) { + log.debug("Could not parse connector configuration file '{}' into a Map with String keys and values", filePath); + } + + try { + CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile, + new TypeReference<CreateConnectorRequest>() { }); + if (createConnectorRequest.config().containsKey(NAME_CONFIG)) { + if (!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name())) { + throw new ConnectException("Connector name configuration in 'config' doesn't match the one specified in 'name'"); + } + } else { + createConnectorRequest.config().put(NAME_CONFIG, createConnectorRequest.name()); + } + return createConnectorRequest; + } catch (StreamReadException | DatabindException e) { + log.debug("Could not parse connector configuration file '{}' into an object of type {}", + filePath, CreateConnectorRequest.class.getSimpleName()); + } + + Map<String, String> connectorConfigs = Utils.propsToStringMap(Utils.loadProps(filePath)); Review Comment: This has the interesting behavior that files with this content: ```json { "name": "local-file-sink", "other.key": "test", "config": { "name": "local-file-sink", "connector.class": "FileStreamSink", "tasks.max": "1", "file": "test.sink.txt", "topics": "connect-test" } } ``` will fail to be parsed as `Map<String, String>`, then fail to be parsed as a `CreateConnectorRequest`, and will finally be parsed as a Java properties file, at which point this error message will be generated before startup is aborted: > org.apache.kafka.connect.errors.ConnectException: Connector configuration at 'connect-file-sink-wrapped.json' is missing the mandatory 'name' configuration IMO it'd be a bit friendlier to ignore unknown fields when attempting to parse the file as a `CreateConnectorRequest`. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org