yashmayya commented on code in PR #14704:
URL: https://github.com/apache/kafka/pull/14704#discussion_r1386195297
##########
docs/connect.html:
##########
@@ -41,7 +41,7 @@ <h4><a id="connect_running" href="#connect_running">Running
Kafka Connect</a></h
<p>In standalone mode all work is performed in a single process. This
configuration is simpler to setup and get started with and may be useful in
situations where only one worker makes sense (e.g. collecting log files), but
it does not benefit from some of the features of Kafka Connect such as fault
tolerance. You can start a standalone process with the following command:</p>
<pre class="brush: bash;">
-> bin/connect-standalone.sh config/connect-standalone.properties
[connector1.properties connector2.properties ...]</pre>
+> bin/connect-standalone.sh config/connect-standalone.properties
[connector1.properties connector2.json ...]</pre>
Review Comment:
Good point, I'd overlooked it because currently it only shows the worker
properties file and not the optional connector properties file (it's only
printed out if no argument is passed to the CLI). I've updated it to look
similar to this docs line.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect
connect, String[] extraAr
}
}
+ /**
+ * Parse a connector configuration file into a {@link
CreateConnectorRequest}. The file can have any one of the following formats:
+ * <ol>
+ * <li>A JSON file containing an Object with only String keys and
values that represent the connector configuration.</li>
+ * <li>A JSON file containing an Object that can be parsed directly
into a {@link CreateConnectorRequest}</li>
+ * <li>A valid Java Properties file (i.e. containing String key/value
pairs representing the connector configuration)</li>
+ * </ol>
+ * <p>
+ * Visible for testing.
+ *
+ * @param filePath the path of the connector configuration file
+ * @return the parsed connector configuration in the form of a {@link
CreateConnectorRequest}
+ */
+ CreateConnectorRequest parseConnectorConfigurationFile(String filePath)
throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ File connectorConfigurationFile = Paths.get(filePath).toFile();
+ try {
+ Map<String, String> connectorConfigs = objectMapper.readValue(
+ connectorConfigurationFile,
+ new TypeReference<Map<String, String>>() { });
+
+ if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+ throw new ConnectException("Connector configuration at '" +
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+ + "configuration");
+ }
+ return new
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs,
null);
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
a Map with String keys and values", filePath);
+ }
+
+ try {
+ CreateConnectorRequest createConnectorRequest =
objectMapper.readValue(connectorConfigurationFile,
+ new TypeReference<CreateConnectorRequest>() { });
+ if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+ if
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
{
+ throw new ConnectException("Connector name configuration
in 'config' doesn't match the one specified in 'name'");
+ }
+ } else {
+ createConnectorRequest.config().put(NAME_CONFIG,
createConnectorRequest.name());
+ }
+ return createConnectorRequest;
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
an object of type {}",
+ filePath, CreateConnectorRequest.class.getSimpleName());
+ }
+
+ Map<String, String> connectorConfigs =
Utils.propsToStringMap(Utils.loadProps(filePath));
Review Comment:
> IMO it'd be a bit friendlier to ignore unknown fields when attempting to
parse the file as a CreateConnectorRequest. Thoughts?
💯
I've changed this behavior and introduced an additional test, thanks for
pointing this out!
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -108,6 +108,19 @@ public interface Herder {
*/
void putConnectorConfig(String connName, Map<String, String> config,
boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
+ /**
+ * Set the configuration for a connector, along with a target state
optionally. This supports creation and updating.
+ * @param connName name of the connector
+ * @param config the connector's configuration
+ * @param targetState the desired target state for the connector; may be
{@code null} if no target state change is desired. Note that the default
+ * target state is {@link TargetState#STARTED} if no
target state exists previously
+ * @param allowReplace if true, allow overwriting previous configs; if
false, throw {@link AlreadyExistsException}
+ * if a connector with the same name already exists
+ * @param callback callback to invoke when the configuration has been
written
+ */
+ void putConnectorConfig(String connName, Map<String, String> config,
TargetState targetState, boolean allowReplace,
Review Comment:
I did ponder over this one for a while and the changes in the
`ConfigBackingStore` interface just introduce a new parameter to the existing
method instead of adding a new one (same situation as the `Herder` interface).
Here, however, I was a bit more hesitant to do the same because although it
isn't public API, I've seen some direct raw usages (not through the REST API)
of the `Herder` in the wild (i.e. outside of Connect). It's possible that the
direct usage in MM2 was used as an example / precedent to do so. It'd be nice
if we could avoid breaking such usages by introducing this new method instead,
WDYT?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -87,6 +98,61 @@ protected void processExtraArgs(Herder herder, Connect
connect, String[] extraAr
}
}
+ /**
+ * Parse a connector configuration file into a {@link
CreateConnectorRequest}. The file can have any one of the following formats:
+ * <ol>
+ * <li>A JSON file containing an Object with only String keys and
values that represent the connector configuration.</li>
+ * <li>A JSON file containing an Object that can be parsed directly
into a {@link CreateConnectorRequest}</li>
+ * <li>A valid Java Properties file (i.e. containing String key/value
pairs representing the connector configuration)</li>
+ * </ol>
+ * <p>
+ * Visible for testing.
+ *
+ * @param filePath the path of the connector configuration file
+ * @return the parsed connector configuration in the form of a {@link
CreateConnectorRequest}
+ */
+ CreateConnectorRequest parseConnectorConfigurationFile(String filePath)
throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ File connectorConfigurationFile = Paths.get(filePath).toFile();
+ try {
+ Map<String, String> connectorConfigs = objectMapper.readValue(
+ connectorConfigurationFile,
+ new TypeReference<Map<String, String>>() { });
+
+ if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+ throw new ConnectException("Connector configuration at '" +
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+ + "configuration");
+ }
+ return new
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs,
null);
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
a Map with String keys and values", filePath);
+ }
+
+ try {
+ CreateConnectorRequest createConnectorRequest =
objectMapper.readValue(connectorConfigurationFile,
+ new TypeReference<CreateConnectorRequest>() { });
+ if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+ if
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
{
+ throw new ConnectException("Connector name configuration
in 'config' doesn't match the one specified in 'name'");
+ }
+ } else {
+ createConnectorRequest.config().put(NAME_CONFIG,
createConnectorRequest.name());
+ }
+ return createConnectorRequest;
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
an object of type {}",
+ filePath, CreateConnectorRequest.class.getSimpleName());
+ }
+
+ Map<String, String> connectorConfigs =
Utils.propsToStringMap(Utils.loadProps(filePath));
Review Comment:
> IMO it'd be a bit friendlier to ignore unknown fields when attempting to
parse the file as a CreateConnectorRequest. Thoughts?
💯
I've changed this behavior and introduced an additional test, thanks for
pointing this out!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]