C0urante commented on code in PR #14704:
URL: https://github.com/apache/kafka/pull/14704#discussion_r1385398359


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect 
connect, String[] extraAr
         }
     }
 
+    /**
+     * Parse a connector configuration file into a {@link 
CreateConnectorRequest}. The file can have any one of the following formats:
+     * <ol>
+     *     <li>A JSON file containing an Object with only String keys and 
values that represent the connector configuration.</li>
+     *     <li>A JSON file containing an Object that can be parsed directly 
into a {@link CreateConnectorRequest}</li>
+     *     <li>A valid Java Properties file (i.e. containing String key/value 
pairs representing the connector configuration)</li>
+     * </ol>
+     * <p>
+     * Visible for testing.
+     *
+     * @param filePath the path of the connector configuration file
+     * @return the parsed connector configuration in the form of a {@link 
CreateConnectorRequest}
+     */
+    CreateConnectorRequest parseConnectorConfigurationFile(String filePath) 
throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        File connectorConfigurationFile = Paths.get(filePath).toFile();
+        try {
+            Map<String, String> connectorConfigs = objectMapper.readValue(
+                connectorConfigurationFile,
+                new TypeReference<Map<String, String>>() { });
+
+            if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+                throw new ConnectException("Connector configuration at '" + 
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+                    + "configuration");
+            }
+            return new 
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, 
null);
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
a Map with String keys and values", filePath);
+        }
+
+        try {
+            CreateConnectorRequest createConnectorRequest = 
objectMapper.readValue(connectorConfigurationFile,
+                new TypeReference<CreateConnectorRequest>() { });
+            if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+                if 
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
 {
+                    throw new ConnectException("Connector name configuration 
in 'config' doesn't match the one specified in 'name'");

Review Comment:
   Can we include the file name in the error message here, like we do in the 
other two name-related exceptions?



##########
docs/connect.html:
##########
@@ -293,7 +293,7 @@ <h4><a id="connect_rest" href="#connect_rest">REST 
API</a></h4>
 
     <ul>
         <li><code>GET /connectors</code> - return a list of active 
connectors</li>
-        <li><code>POST /connectors</code> - create a new connector; the 
request body should be a JSON object containing a string <code>name</code> 
field and an object <code>config</code> field with the connector configuration 
parameters</li>
+        <li><code>POST /connectors</code> - create a new connector; the 
request body should be a JSON object containing a string <code>name</code> 
field and an object <code>config</code> field with the connector configuration 
parameters. The JSON object may also optionally contain a string 
<code>initial_state</code> field which can take the following values - 
<code>STOPPED</code>, <code>PAUSED</code> and the default value of 
<code>RUNNING</code></li>

Review Comment:
   Nit: wording
   ```suggestion
           <li><code>POST /connectors</code> - create a new connector; the 
request body should be a JSON object containing a string <code>name</code> 
field and an object <code>config</code> field with the connector configuration 
parameters. The JSON object may also optionally contain a string 
<code>initial_state</code> field which can take the following values - 
<code>STOPPED</code>, <code>PAUSED</code> or <code>RUNNING</code>(the 
default)</li>
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect 
connect, String[] extraAr
         }
     }
 
+    /**
+     * Parse a connector configuration file into a {@link 
CreateConnectorRequest}. The file can have any one of the following formats:
+     * <ol>
+     *     <li>A JSON file containing an Object with only String keys and 
values that represent the connector configuration.</li>
+     *     <li>A JSON file containing an Object that can be parsed directly 
into a {@link CreateConnectorRequest}</li>
+     *     <li>A valid Java Properties file (i.e. containing String key/value 
pairs representing the connector configuration)</li>

Review Comment:
   Can we clarify that we attempt to parse in this order?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -61,23 +71,24 @@ public ConnectStandalone(String... args) {
 
     @Override
     protected String usage() {
-        return "ConnectStandalone worker.properties [connector1.properties 
connector2.properties ...]";
+        return "ConnectStandalone worker.properties [connector1.properties 
connector2.json ...]";
     }
 
     @Override
     protected void processExtraArgs(Herder herder, Connect connect, String[] 
extraArgs) {
         try {
-            for (final String connectorPropsFile : extraArgs) {
-                Map<String, String> connectorProps = 
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+            for (final String connectorConfigFile : extraArgs) {
+                CreateConnectorRequest createConnectorRequest = 
parseConnectorConfigurationFile(connectorConfigFile);
                 FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>((error, info) -> {
                     if (error != null)
-                        log.error("Failed to create connector for {}", 
connectorPropsFile);
+                        log.error("Failed to create connector for {}", 
connectorConfigFile);
                     else
                         log.info("Created connector {}", info.result().name());
                 });
                 herder.putConnectorConfig(
-                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
-                        connectorProps, false, cb);
+                    createConnectorRequest.name(), 
createConnectorRequest.config(),
+                    createConnectorRequest.initialState() != null ? 
createConnectorRequest.initialState().toTargetState() : null,

Review Comment:
   Nit: we can move this null-safe conversion logic to the 
`CreateConnectorRequest` class, maybe with a `targetState()` or 
`initialTargetState()` method.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -145,7 +146,11 @@ public Response createConnector(final @Parameter(hidden = 
true) @QueryParam("for
         checkAndPutConnectorConfigName(name, configs);
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
-        herder.putConnectorConfig(name, configs, false, cb);
+        TargetState targetState = null;
+        if (createRequest.initialState() != null) {
+            targetState = createRequest.initialState().toTargetState();
+        }

Review Comment:
   This is another place that would benefit from a 
`CreateConnectorRequest::targetState` method.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectStandaloneTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final Map<String, String> CONNECTOR_CONFIG = new 
HashMap<>();
+    static {
+        CONNECTOR_CONFIG.put(NAME_CONFIG, CONNECTOR_NAME);
+        CONNECTOR_CONFIG.put("key1", "val1");
+        CONNECTOR_CONFIG.put("key2", "val2");
+    }
+
+    private final ConnectStandalone connectStandalone = new 
ConnectStandalone();
+    private File connectorConfigurationFile;
+
+    @Before
+    public void setUp() throws IOException {
+        connectorConfigurationFile = TestUtils.tempFile();
+    }
+
+    @Test
+    public void testParseJavaPropertiesFile() throws Exception {
+        Properties properties = new Properties();
+        CONNECTOR_CONFIG.forEach(properties::setProperty);
+
+        try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+            properties.store(writer, null);
+        }
+
+        CreateConnectorRequest request = 
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+        assertEquals(CONNECTOR_NAME, request.name());
+        assertEquals(CONNECTOR_CONFIG, request.config());
+        assertNull(request.initialState());
+    }
+
+    @Test
+    public void testParseJsonFileWithConnectorConfiguration() throws Exception 
{
+        try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+            writer.write(new 
ObjectMapper().writeValueAsString(CONNECTOR_CONFIG));
+        }
+
+        CreateConnectorRequest request = 
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+        assertEquals(CONNECTOR_NAME, request.name());
+        assertEquals(CONNECTOR_CONFIG, request.config());
+        assertNull(request.initialState());
+    }
+
+    @Test
+    public void testParseJsonFileWithCreateConnectorRequest() throws Exception 
{

Review Comment:
   Worth adding a case for this format with no initial state as well?



##########
docs/connect.html:
##########
@@ -60,7 +60,7 @@ <h4><a id="connect_running" href="#connect_running">Running 
Kafka Connect</a></h
     
     <p>Starting with 2.3.0, client configuration overrides can be configured 
individually per connector by using the prefixes 
<code>producer.override.</code> and <code>consumer.override.</code> for Kafka 
sources or Kafka sinks respectively. These overrides are included with the rest 
of the connector's configuration properties.</p>
 
-    <p>The remaining parameters are connector configuration files. You may 
include as many as you want, but all will execute within the same process (on 
different threads). You can also choose not to specify any connector 
configuration files on the command line, and instead use the REST API to create 
connectors at runtime after your standalone worker starts.</p>
+    <p>The remaining parameters are connector configuration files. Each file 
may either be a Java Properties files or a JSON file containing an object with 
the same structure as the request body of either the <code>POST 
/connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code> 
endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI 
documentation</a>). You may include as many as you want, but all will execute 
within the same process (on different threads). You can also choose not to 
specify any connector configuration files on the command line, and instead use 
the REST API to create connectors at runtime after your standalone worker 
starts.</p>

Review Comment:
   Nit: typo
   ```suggestion
       <p>The remaining parameters are connector configuration files. Each file 
may either be a Java Properties file or a JSON file containing an object with 
the same structure as the request body of either the <code>POST 
/connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code> 
endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI 
documentation</a>). You may include as many as you want, but all will execute 
within the same process (on different threads). You can also choose not to 
specify any connector configuration files on the command line, and instead use 
the REST API to create connectors at runtime after your standalone worker 
starts.</p>
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java:
##########
@@ -42,17 +47,47 @@ public Map<String, String> config() {
         return config;
     }
 
+    @JsonProperty
+    public InitialState initialState() {
+        return initialState;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         CreateConnectorRequest that = (CreateConnectorRequest) o;
         return Objects.equals(name, that.name) &&
-                Objects.equals(config, that.config);
+            Objects.equals(config, that.config) &&
+            Objects.equals(initialState, that.initialState);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, config);
+        return Objects.hash(name, config, initialState);
+    }
+
+    public enum InitialState {
+        RUNNING,
+        PAUSED,
+        STOPPED;
+
+        @JsonCreator
+        public static InitialState forValue(String value) {
+            return InitialState.valueOf(value.toUpperCase(Locale.ROOT));
+        }
+
+        public TargetState toTargetState() {
+            switch (this) {
+                case RUNNING:
+                    return TargetState.STARTED;
+                case PAUSED:
+                    return TargetState.PAUSED;
+                case STOPPED:
+                    return TargetState.STOPPED;
+                default:
+                    throw new IllegalArgumentException("Unknown initial 
state");

Review Comment:
   Can we include the state here?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -108,6 +108,19 @@ public interface Herder {
      */
     void putConnectorConfig(String connName, Map<String, String> config, 
boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
 
+    /**
+     * Set the configuration for a connector, along with a target state 
optionally. This supports creation and updating.
+     * @param connName name of the connector
+     * @param config the connector's configuration
+     * @param targetState the desired target state for the connector; may be 
{@code null} if no target state change is desired. Note that the default
+     *                    target state is {@link TargetState#STARTED} if no 
target state exists previously
+     * @param allowReplace if true, allow overwriting previous configs; if 
false, throw {@link AlreadyExistsException}
+     *                     if a connector with the same name already exists
+     * @param callback callback to invoke when the configuration has been 
written
+     */
+    void putConnectorConfig(String connName, Map<String, String> config, 
TargetState targetState, boolean allowReplace,

Review Comment:
   Why introduce a new method? My first instinct would've been to add a new 
parameter to the existing `Herder::putConnectorConfig` method.



##########
docs/connect.html:
##########
@@ -41,7 +41,7 @@ <h4><a id="connect_running" href="#connect_running">Running 
Kafka Connect</a></h
     <p>In standalone mode all work is performed in a single process. This 
configuration is simpler to setup and get started with and may be useful in 
situations where only one worker makes sense (e.g. collecting log files), but 
it does not benefit from some of the features of Kafka Connect such as fault 
tolerance. You can start a standalone process with the following command:</p>
 
     <pre class="brush: bash;">
-&gt; bin/connect-standalone.sh config/connect-standalone.properties 
[connector1.properties connector2.properties ...]</pre>
+&gt; bin/connect-standalone.sh config/connect-standalone.properties 
[connector1.properties connector2.json ...]</pre>

Review Comment:
   Do we also want to update the 
[usage](https://github.com/apache/kafka/blob/edc7e10a745c350ad1efa9e4866370dc8ea0e034/bin/connect-standalone.sh#L19)
 for `connect-standalone.sh`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect 
connect, String[] extraAr
         }
     }
 
+    /**
+     * Parse a connector configuration file into a {@link 
CreateConnectorRequest}. The file can have any one of the following formats:
+     * <ol>
+     *     <li>A JSON file containing an Object with only String keys and 
values that represent the connector configuration.</li>
+     *     <li>A JSON file containing an Object that can be parsed directly 
into a {@link CreateConnectorRequest}</li>
+     *     <li>A valid Java Properties file (i.e. containing String key/value 
pairs representing the connector configuration)</li>
+     * </ol>
+     * <p>
+     * Visible for testing.
+     *
+     * @param filePath the path of the connector configuration file
+     * @return the parsed connector configuration in the form of a {@link 
CreateConnectorRequest}
+     */
+    CreateConnectorRequest parseConnectorConfigurationFile(String filePath) 
throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        File connectorConfigurationFile = Paths.get(filePath).toFile();
+        try {
+            Map<String, String> connectorConfigs = objectMapper.readValue(
+                connectorConfigurationFile,
+                new TypeReference<Map<String, String>>() { });
+
+            if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+                throw new ConnectException("Connector configuration at '" + 
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+                    + "configuration");
+            }
+            return new 
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, 
null);
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
a Map with String keys and values", filePath);
+        }
+
+        try {
+            CreateConnectorRequest createConnectorRequest = 
objectMapper.readValue(connectorConfigurationFile,
+                new TypeReference<CreateConnectorRequest>() { });
+            if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+                if 
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
 {
+                    throw new ConnectException("Connector name configuration 
in 'config' doesn't match the one specified in 'name'");
+                }
+            } else {
+                createConnectorRequest.config().put(NAME_CONFIG, 
createConnectorRequest.name());
+            }
+            return createConnectorRequest;
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
an object of type {}",
+                filePath, CreateConnectorRequest.class.getSimpleName());
+        }
+
+        Map<String, String> connectorConfigs = 
Utils.propsToStringMap(Utils.loadProps(filePath));

Review Comment:
   This has the interesting behavior that files with this content:
   
   ```json
   {
     "name": "local-file-sink",
     "other.key": "test",
     "config": {
       "name": "local-file-sink",
       "connector.class": "FileStreamSink",
       "tasks.max": "1",
       "file": "test.sink.txt",
       "topics": "connect-test"
     }
   }
   ```
   
   will fail to be parsed as `Map<String, String>`, then fail to be parsed as a 
`CreateConnectorRequest`, and will finally be parsed as a Java properties file, 
at which point this error message will be generated before startup is aborted:
   
   > org.apache.kafka.connect.errors.ConnectException: Connector configuration 
at 'connect-file-sink-wrapped.json' is missing the mandatory 'name' 
configuration
   
   IMO it'd be a bit friendlier to ignore unknown fields when attempting to 
parse the file as a `CreateConnectorRequest`. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to