yashmayya commented on code in PR #14704:
URL: https://github.com/apache/kafka/pull/14704#discussion_r1386195297


##########
docs/connect.html:
##########
@@ -41,7 +41,7 @@ <h4><a id="connect_running" href="#connect_running">Running 
Kafka Connect</a></h
     <p>In standalone mode all work is performed in a single process. This 
configuration is simpler to setup and get started with and may be useful in 
situations where only one worker makes sense (e.g. collecting log files), but 
it does not benefit from some of the features of Kafka Connect such as fault 
tolerance. You can start a standalone process with the following command:</p>
 
     <pre class="brush: bash;">
-&gt; bin/connect-standalone.sh config/connect-standalone.properties 
[connector1.properties connector2.properties ...]</pre>
+&gt; bin/connect-standalone.sh config/connect-standalone.properties 
[connector1.properties connector2.json ...]</pre>

Review Comment:
   Good point, I'd overlooked it because currently it only shows the worker 
properties file and not the optional connector properties file (it's only 
printed out if no argument is passed to the CLI). I've updated it to look 
similar to this docs line.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect 
connect, String[] extraAr
         }
     }
 
+    /**
+     * Parse a connector configuration file into a {@link 
CreateConnectorRequest}. The file can have any one of the following formats:
+     * <ol>
+     *     <li>A JSON file containing an Object with only String keys and 
values that represent the connector configuration.</li>
+     *     <li>A JSON file containing an Object that can be parsed directly 
into a {@link CreateConnectorRequest}</li>
+     *     <li>A valid Java Properties file (i.e. containing String key/value 
pairs representing the connector configuration)</li>
+     * </ol>
+     * <p>
+     * Visible for testing.
+     *
+     * @param filePath the path of the connector configuration file
+     * @return the parsed connector configuration in the form of a {@link 
CreateConnectorRequest}
+     */
+    CreateConnectorRequest parseConnectorConfigurationFile(String filePath) 
throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        File connectorConfigurationFile = Paths.get(filePath).toFile();
+        try {
+            Map<String, String> connectorConfigs = objectMapper.readValue(
+                connectorConfigurationFile,
+                new TypeReference<Map<String, String>>() { });
+
+            if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+                throw new ConnectException("Connector configuration at '" + 
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+                    + "configuration");
+            }
+            return new 
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, 
null);
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
a Map with String keys and values", filePath);
+        }
+
+        try {
+            CreateConnectorRequest createConnectorRequest = 
objectMapper.readValue(connectorConfigurationFile,
+                new TypeReference<CreateConnectorRequest>() { });
+            if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+                if 
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
 {
+                    throw new ConnectException("Connector name configuration 
in 'config' doesn't match the one specified in 'name'");
+                }
+            } else {
+                createConnectorRequest.config().put(NAME_CONFIG, 
createConnectorRequest.name());
+            }
+            return createConnectorRequest;
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
an object of type {}",
+                filePath, CreateConnectorRequest.class.getSimpleName());
+        }
+
+        Map<String, String> connectorConfigs = 
Utils.propsToStringMap(Utils.loadProps(filePath));

Review Comment:
   > IMO it'd be a bit friendlier to ignore unknown fields when attempting to 
parse the file as a CreateConnectorRequest. Thoughts?
   
   💯 
   
   I've changed this behavior and introduced an additional test, thanks for 
pointing this out!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -108,6 +108,19 @@ public interface Herder {
      */
     void putConnectorConfig(String connName, Map<String, String> config, 
boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
 
+    /**
+     * Set the configuration for a connector, along with a target state 
optionally. This supports creation and updating.
+     * @param connName name of the connector
+     * @param config the connector's configuration
+     * @param targetState the desired target state for the connector; may be 
{@code null} if no target state change is desired. Note that the default
+     *                    target state is {@link TargetState#STARTED} if no 
target state exists previously
+     * @param allowReplace if true, allow overwriting previous configs; if 
false, throw {@link AlreadyExistsException}
+     *                     if a connector with the same name already exists
+     * @param callback callback to invoke when the configuration has been 
written
+     */
+    void putConnectorConfig(String connName, Map<String, String> config, 
TargetState targetState, boolean allowReplace,

Review Comment:
   I did ponder over this one for a while and the changes in the 
`ConfigBackingStore` interface just introduce a new parameter to the existing 
method instead of adding a new one (same situation as the `Herder` interface). 
Here, however, I was a bit more hesitant to do the same because although it 
isn't public API, I've seen some direct raw usages (not through the REST API) 
of the `Herder` in the wild (i.e. outside of Connect). It's possible that the 
direct usage in MM2 was used as an example / precedent to do so. It'd be nice 
if we could avoid breaking such usages by introducing this new method instead, 
WDYT?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect 
connect, String[] extraAr
         }
     }
 
+    /**
+     * Parse a connector configuration file into a {@link 
CreateConnectorRequest}. The file can have any one of the following formats:
+     * <ol>
+     *     <li>A JSON file containing an Object with only String keys and 
values that represent the connector configuration.</li>
+     *     <li>A JSON file containing an Object that can be parsed directly 
into a {@link CreateConnectorRequest}</li>
+     *     <li>A valid Java Properties file (i.e. containing String key/value 
pairs representing the connector configuration)</li>
+     * </ol>
+     * <p>
+     * Visible for testing.
+     *
+     * @param filePath the path of the connector configuration file
+     * @return the parsed connector configuration in the form of a {@link 
CreateConnectorRequest}
+     */
+    CreateConnectorRequest parseConnectorConfigurationFile(String filePath) 
throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        File connectorConfigurationFile = Paths.get(filePath).toFile();
+        try {
+            Map<String, String> connectorConfigs = objectMapper.readValue(
+                connectorConfigurationFile,
+                new TypeReference<Map<String, String>>() { });
+
+            if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+                throw new ConnectException("Connector configuration at '" + 
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+                    + "configuration");
+            }
+            return new 
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, 
null);
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
a Map with String keys and values", filePath);
+        }
+
+        try {
+            CreateConnectorRequest createConnectorRequest = 
objectMapper.readValue(connectorConfigurationFile,
+                new TypeReference<CreateConnectorRequest>() { });
+            if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+                if 
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
 {
+                    throw new ConnectException("Connector name configuration 
in 'config' doesn't match the one specified in 'name'");
+                }
+            } else {
+                createConnectorRequest.config().put(NAME_CONFIG, 
createConnectorRequest.name());
+            }
+            return createConnectorRequest;
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
an object of type {}",
+                filePath, CreateConnectorRequest.class.getSimpleName());
+        }
+
+        Map<String, String> connectorConfigs = 
Utils.propsToStringMap(Utils.loadProps(filePath));

Review Comment:
   > IMO it'd be a bit friendlier to ignore unknown fields when attempting to 
parse the file as a CreateConnectorRequest. Thoughts?
   
   💯 
   
   I've changed this behavior and introduced an additional test, thanks for 
pointing this out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to