This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a2868915664 KAFKA-6586: Refactor ConnectDistributed and
ConnectStandalone to re-use shared logic (#12947)
a2868915664 is described below
commit a2868915664be4ab58ae5b11e4d955a6263223eb
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Dec 21 23:43:56 2022 +0530
KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use
shared logic (#12947)
Reviewers: Christo Lolov <[email protected]>, Chris Egerton
<[email protected]>
---
.../kafka/connect/cli/AbstractConnectCli.java | 153 +++++++++++++++++++++
.../kafka/connect/cli/ConnectDistributed.java | 105 ++++----------
.../kafka/connect/cli/ConnectStandalone.java | 128 +++++++----------
3 files changed, 234 insertions(+), 152 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
new file mode 100644
index 00000000000..8831081bf44
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Common initialization logic for Kafka Connect, intended for use by command
line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+ private static final Logger log =
LoggerFactory.getLogger(AbstractConnectCli.class);
+ private final String[] args;
+ private final Time time = Time.SYSTEM;
+
+ /**
+ *
+ * @param args the CLI arguments to be processed. Note that if one or more
arguments are passed, the first argument is
+ * assumed to be the Connect worker properties file and is
processed in {@link #run()}. The remaining arguments
+ * can be handled in {@link #processExtraArgs(Herder, Connect,
String[])}
+ */
+ protected AbstractConnectCli(String... args) {
+ this.args = args;
+ }
+
+ protected abstract String usage();
+
+ /**
+ * The first CLI argument is assumed to be the Connect worker properties
file and is processed by default. This method
+ * can be overridden if there are more arguments that need to be processed.
+ *
+ * @param herder the {@link Herder} instance that can be used to perform
operations on the Connect cluster
+ * @param connect the {@link Connect} instance that can be stopped (via
{@link Connect#stop()}) if there's an error
+ * encountered while processing the additional CLI
arguments.
+ * @param extraArgs the extra CLI arguments that need to be processed
+ */
+ protected void processExtraArgs(Herder herder, Connect connect, String[]
extraArgs) {
+ }
+
+ protected abstract Herder createHerder(T config, String workerId, Plugins
plugins,
+ ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy,
+ RestServer restServer, RestClient
restClient);
+
+ protected abstract T createConfig(Map<String, String> workerProps);
+
+ /**
+ * Validate {@link #args}, process worker properties from the first CLI
argument, and start {@link Connect}
+ */
+ public void run() {
+ if (args.length < 1 || Arrays.asList(args).contains("--help")) {
+ log.info("Usage: {}", usage());
+ Exit.exit(1);
+ }
+
+ try {
+ String workerPropsFile = args[0];
+ Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+ Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) :
Collections.emptyMap();
+ String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
+ Connect connect = startConnect(workerProps, extraArgs);
+
+ // Shutdown will be triggered by Ctrl-C or via HTTP shutdown
request
+ connect.awaitStop();
+
+ } catch (Throwable t) {
+ log.error("Stopping due to error", t);
+ Exit.exit(2);
+ }
+ }
+
+ /**
+ * Initialize and start an instance of {@link Connect}
+ *
+ * @param workerProps the worker properties map used to initialize the
{@link WorkerConfig}
+ * @param extraArgs any additional CLI arguments that may need to be
processed via
+ * {@link #processExtraArgs(Herder, Connect, String[])}
+ * @return a started instance of {@link Connect}
+ */
+ public Connect startConnect(Map<String, String> workerProps, String...
extraArgs) {
+ log.info("Kafka Connect worker initializing ...");
+ long initStart = time.hiResClockMs();
+
+ WorkerInfo initInfo = new WorkerInfo();
+ initInfo.logAll();
+
+ log.info("Scanning for plugin classes. This might take a moment ...");
+ Plugins plugins = new Plugins(workerProps);
+ plugins.compareAndSwapWithDelegatingLoader();
+ T config = createConfig(workerProps);
+ log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
+ RestClient restClient = new RestClient(config);
+
+ RestServer restServer = new RestServer(config, restClient);
+ restServer.initializeServer();
+
+ URI advertisedUrl = restServer.advertisedUrl();
+ String workerId = advertisedUrl.getHost() + ":" +
advertisedUrl.getPort();
+
+ ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy = plugins.newPlugin(
+
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+ config, ConnectorClientConfigOverridePolicy.class);
+
+ Herder herder = createHerder(config, workerId, plugins,
connectorClientConfigOverridePolicy, restServer, restClient);
+
+ final Connect connect = new Connect(herder, restServer);
+ log.info("Kafka Connect worker initialization took {}ms",
time.hiResClockMs() - initStart);
+ try {
+ connect.start();
+ } catch (Exception e) {
+ log.error("Failed to start Connect", e);
+ connect.stop();
+ Exit.exit(3);
+ }
+
+ processExtraArgs(herder, connect, extraArgs);
+
+ return connect;
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index a786a172a29..1da9fa40622 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -16,15 +16,11 @@
*/
package org.apache.kafka.connect.cli;
-import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
-import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -41,9 +37,6 @@ import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -51,65 +44,32 @@ import static
org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
/**
* <p>
- * Command line utility that runs Kafka Connect in distributed mode. In this
mode, the process joints a group of other workers
- * and work is distributed among them. This is useful for running Connect as a
service, where connectors can be
- * submitted to the cluster to be automatically executed in a scalable,
distributed fashion. This also allows you to
- * easily scale out horizontally, elastically adding or removing capacity
simply by starting or stopping worker
- * instances.
+ * Command line utility that runs Kafka Connect in distributed mode. In this
mode, the process joins a group of other
+ * workers and work (connectors and tasks) is distributed among them. This is
useful for running Connect as a service,
+ * where connectors can be submitted to the cluster to be automatically
executed in a scalable, distributed fashion.
+ * This also allows you to easily scale out horizontally, elastically adding
or removing capacity simply by starting or
+ * stopping worker instances.
* </p>
*/
-public class ConnectDistributed {
+public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
private static final Logger log =
LoggerFactory.getLogger(ConnectDistributed.class);
- private final Time time = Time.SYSTEM;
- private final long initStart = time.hiResClockMs();
-
- public static void main(String[] args) {
-
- if (args.length < 1 || Arrays.asList(args).contains("--help")) {
- log.info("Usage: ConnectDistributed worker.properties");
- Exit.exit(1);
- }
-
- try {
- WorkerInfo initInfo = new WorkerInfo();
- initInfo.logAll();
-
- String workerPropsFile = args[0];
- Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
- Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) :
Collections.emptyMap();
-
- ConnectDistributed connectDistributed = new ConnectDistributed();
- Connect connect = connectDistributed.startConnect(workerProps);
-
- // Shutdown will be triggered by Ctrl-C or via HTTP shutdown
request
- connect.awaitStop();
+ public ConnectDistributed(String... args) {
+ super(args);
+ }
- } catch (Throwable t) {
- log.error("Stopping due to error", t);
- Exit.exit(2);
- }
+ @Override
+ protected String usage() {
+ return "ConnectDistributed worker.properties";
}
- public Connect startConnect(Map<String, String> workerProps) {
- log.info("Scanning for plugin classes. This might take a moment ...");
- Plugins plugins = new Plugins(workerProps);
- plugins.compareAndSwapWithDelegatingLoader();
- DistributedConfig config = new DistributedConfig(workerProps);
+ @Override
+ protected Herder createHerder(DistributedConfig config, String workerId,
Plugins plugins,
+ ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy,
+ RestServer restServer, RestClient
restClient) {
String kafkaClusterId = config.kafkaClusterId();
- log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
- RestClient restClient = new RestClient(config);
-
- RestServer rest = new RestServer(config, restClient);
- rest.initializeServer();
-
- URI advertisedUrl = rest.advertisedUrl();
- String workerId = advertisedUrl.getHost() + ":" +
advertisedUrl.getPort();
-
String clientIdBase = ConnectUtils.clientIdBase(config);
-
// Create the admin client to be shared by all backing stores.
Map<String, Object> adminProps = new HashMap<>(config.originals());
ConnectUtils.addMetricsContextProperties(adminProps, config,
kafkaClusterId);
@@ -119,15 +79,11 @@ public class ConnectDistributed {
KafkaOffsetBackingStore offsetBackingStore = new
KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);
offsetBackingStore.configure(config);
- ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy = plugins.newPlugin(
-
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
- config, ConnectorClientConfigOverridePolicy.class);
-
- Worker worker = new Worker(workerId, time, plugins, config,
offsetBackingStore, connectorClientConfigOverridePolicy);
+ Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config,
offsetBackingStore, connectorClientConfigOverridePolicy);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
- StatusBackingStore statusBackingStore = new
KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin,
clientIdBase);
+ StatusBackingStore statusBackingStore = new
KafkaStatusBackingStore(Time.SYSTEM, internalValueConverter, sharedAdmin,
clientIdBase);
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
@@ -139,21 +95,18 @@ public class ConnectDistributed {
// Pass the shared admin to the distributed herder as an additional
AutoCloseable object that should be closed when the
// herder is stopped. This is easier than having to track and own the
lifecycle ourselves.
- DistributedHerder herder = new DistributedHerder(config, time, worker,
+ return new DistributedHerder(config, Time.SYSTEM, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl.toString(), restClient,
connectorClientConfigOverridePolicy, sharedAdmin);
-
- final Connect connect = new Connect(herder, rest);
- log.info("Kafka Connect distributed worker initialization took {}ms",
time.hiResClockMs() - initStart);
- try {
- connect.start();
- } catch (Exception e) {
- log.error("Failed to start Connect", e);
- connect.stop();
- Exit.exit(3);
- }
+ restServer.advertisedUrl().toString(), restClient,
connectorClientConfigOverridePolicy, sharedAdmin);
+ }
- return connect;
+ @Override
+ protected DistributedConfig createConfig(Map<String, String> workerProps) {
+ return new DistributedConfig(workerProps);
}
+ public static void main(String[] args) {
+ ConnectDistributed connectDistributed = new ConnectDistributed(args);
+ connectDistributed.run();
+ }
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 8b0f4cd2b58..49b59dc7e46 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -24,9 +24,8 @@ import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -37,98 +36,75 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.Map;
/**
* <p>
- * Command line utility that runs Kafka Connect as a standalone process. In
this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a
single process. This is
- * useful for ad hoc, small, or experimental jobs.
+ * Command line utility that runs Kafka Connect as a standalone process. In
this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a
single process. This is useful for ad hoc,
+ * small, or experimental jobs.
* </p>
* <p>
- * By default, no job configs or offset data is persistent. You can make jobs
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Connector and task configs are stored in memory and are not persistent.
However, connector offset data is persistent
+ * since it uses file storage (configurable via {@link
StandaloneConfig#OFFSET_STORAGE_FILE_FILENAME_CONFIG})
* </p>
*/
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
private static final Logger log =
LoggerFactory.getLogger(ConnectStandalone.class);
- public static void main(String[] args) {
+ protected ConnectStandalone(String... args) {
+ super(args);
+ }
- if (args.length < 1 || Arrays.asList(args).contains("--help")) {
- log.info("Usage: ConnectStandalone worker.properties
[connector1.properties connector2.properties ...]");
- Exit.exit(1);
- }
+ @Override
+ protected String usage() {
+ return "ConnectStandalone worker.properties [connector1.properties
connector2.properties ...]";
+ }
+ @Override
+ protected void processExtraArgs(Herder herder, Connect connect, String[]
extraArgs) {
try {
- Time time = Time.SYSTEM;
- log.info("Kafka Connect standalone worker initializing ...");
- long initStart = time.hiResClockMs();
- WorkerInfo initInfo = new WorkerInfo();
- initInfo.logAll();
-
- String workerPropsFile = args[0];
- Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
- Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) :
Collections.emptyMap();
-
- log.info("Scanning for plugin classes. This might take a moment
...");
- Plugins plugins = new Plugins(workerProps);
- plugins.compareAndSwapWithDelegatingLoader();
- StandaloneConfig config = new StandaloneConfig(workerProps);
-
- String kafkaClusterId = config.kafkaClusterId();
- log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
- // Do not initialize a RestClient because the ConnectorsResource
will not use it in standalone mode.
- RestServer rest = new RestServer(config, null);
- rest.initializeServer();
-
- URI advertisedUrl = rest.advertisedUrl();
- String workerId = advertisedUrl.getHost() + ":" +
advertisedUrl.getPort();
+ for (final String connectorPropsFile : extraArgs) {
+ Map<String, String> connectorProps =
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>((error, info) -> {
+ if (error != null)
+ log.error("Failed to create connector for {}",
connectorPropsFile);
+ else
+ log.info("Created connector {}", info.result().name());
+ });
+ herder.putConnectorConfig(
+ connectorProps.get(ConnectorConfig.NAME_CONFIG),
+ connectorProps, false, cb);
+ cb.get();
+ }
+ } catch (Throwable t) {
+ log.error("Stopping after connector error", t);
+ connect.stop();
+ Exit.exit(3);
+ }
+ }
- OffsetBackingStore offsetBackingStore = new
FileOffsetBackingStore();
- offsetBackingStore.configure(config);
+ @Override
+ protected Herder createHerder(StandaloneConfig config, String workerId,
Plugins plugins,
+ ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy,
+ RestServer restServer, RestClient
restClient) {
- ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy = plugins.newPlugin(
-
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
- config, ConnectorClientConfigOverridePolicy.class);
- Worker worker = new Worker(workerId, time, plugins, config,
offsetBackingStore,
- connectorClientConfigOverridePolicy);
+ OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+ offsetBackingStore.configure(config);
- Herder herder = new StandaloneHerder(worker, kafkaClusterId,
connectorClientConfigOverridePolicy);
- final Connect connect = new Connect(herder, rest);
- log.info("Kafka Connect standalone worker initialization took
{}ms", time.hiResClockMs() - initStart);
+ Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config,
offsetBackingStore,
+ connectorClientConfigOverridePolicy);
- try {
- connect.start();
- for (final String connectorPropsFile :
Arrays.copyOfRange(args, 1, args.length)) {
- Map<String, String> connectorProps =
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
- FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>((error, info) -> {
- if (error != null)
- log.error("Failed to create job for {}",
connectorPropsFile);
- else
- log.info("Created connector {}",
info.result().name());
- });
- herder.putConnectorConfig(
- connectorProps.get(ConnectorConfig.NAME_CONFIG),
- connectorProps, false, cb);
- cb.get();
- }
- } catch (Throwable t) {
- log.error("Stopping after connector error", t);
- connect.stop();
- Exit.exit(3);
- }
+ return new StandaloneHerder(worker, config.kafkaClusterId(),
connectorClientConfigOverridePolicy);
+ }
- // Shutdown will be triggered by Ctrl-C or via HTTP shutdown
request
- connect.awaitStop();
+ @Override
+ protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+ return new StandaloneConfig(workerProps);
+ }
- } catch (Throwable t) {
- log.error("Stopping due to error", t);
- Exit.exit(2);
- }
+ public static void main(String[] args) {
+ ConnectStandalone connectStandalone = new ConnectStandalone(args);
+ connectStandalone.run();
}
}