This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a2868915664 KAFKA-6586: Refactor ConnectDistributed and 
ConnectStandalone to re-use shared logic (#12947)
a2868915664 is described below

commit a2868915664be4ab58ae5b11e4d955a6263223eb
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Dec 21 23:43:56 2022 +0530

    KAFKA-6586: Refactor ConnectDistributed and ConnectStandalone to re-use 
shared logic (#12947)
    
    Reviewers: Christo Lolov  <[email protected]>, Chris Egerton 
<[email protected]>
---
 .../kafka/connect/cli/AbstractConnectCli.java      | 153 +++++++++++++++++++++
 .../kafka/connect/cli/ConnectDistributed.java      | 105 ++++----------
 .../kafka/connect/cli/ConnectStandalone.java       | 128 +++++++----------
 3 files changed, 234 insertions(+), 152 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
new file mode 100644
index 00000000000..8831081bf44
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.WorkerInfo;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Common initialization logic for Kafka Connect, intended for use by command 
line utilities
+ *
+ * @param <T> the type of {@link WorkerConfig} to be used
+ */
+public abstract class AbstractConnectCli<T extends WorkerConfig> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(AbstractConnectCli.class);
+    private final String[] args;
+    private final Time time = Time.SYSTEM;
+
+    /**
+     *
+     * @param args the CLI arguments to be processed. Note that if one or more 
arguments are passed, the first argument is
+     *             assumed to be the Connect worker properties file and is 
processed in {@link #run()}. The remaining arguments
+     *             can be handled in {@link #processExtraArgs(Herder, Connect, 
String[])}
+     */
+    protected AbstractConnectCli(String... args) {
+        this.args = args;
+    }
+
+    protected abstract String usage();
+
+    /**
+     * The first CLI argument is assumed to be the Connect worker properties 
file and is processed by default. This method
+     * can be overridden if there are more arguments that need to be processed.
+     *
+     * @param herder the {@link Herder} instance that can be used to perform 
operations on the Connect cluster
+     * @param connect the {@link Connect} instance that can be stopped (via 
{@link Connect#stop()}) if there's an error
+     *                encountered while processing the additional CLI 
arguments.
+     * @param extraArgs the extra CLI arguments that need to be processed
+     */
+    protected void processExtraArgs(Herder herder, Connect connect, String[] 
extraArgs) {
+    }
+
+    protected abstract Herder createHerder(T config, String workerId, Plugins 
plugins,
+                                           ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                                           RestServer restServer, RestClient 
restClient);
+
+    protected abstract T createConfig(Map<String, String> workerProps);
+
+    /**
+     *  Validate {@link #args}, process worker properties from the first CLI 
argument, and start {@link Connect}
+     */
+    public void run() {
+        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
+            log.info("Usage: {}", usage());
+            Exit.exit(1);
+        }
+
+        try {
+            String workerPropsFile = args[0];
+            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.emptyMap();
+            String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
+            Connect connect = startConnect(workerProps, extraArgs);
+
+            // Shutdown will be triggered by Ctrl-C or via HTTP shutdown 
request
+            connect.awaitStop();
+
+        } catch (Throwable t) {
+            log.error("Stopping due to error", t);
+            Exit.exit(2);
+        }
+    }
+
+    /**
+     * Initialize and start an instance of {@link Connect}
+     *
+     * @param workerProps the worker properties map used to initialize the 
{@link WorkerConfig}
+     * @param extraArgs any additional CLI arguments that may need to be 
processed via
+     *                  {@link #processExtraArgs(Herder, Connect, String[])}
+     * @return a started instance of {@link Connect}
+     */
+    public Connect startConnect(Map<String, String> workerProps, String... 
extraArgs) {
+        log.info("Kafka Connect worker initializing ...");
+        long initStart = time.hiResClockMs();
+
+        WorkerInfo initInfo = new WorkerInfo();
+        initInfo.logAll();
+
+        log.info("Scanning for plugin classes. This might take a moment ...");
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        T config = createConfig(workerProps);
+        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+
+        RestClient restClient = new RestClient(config);
+
+        RestServer restServer = new RestServer(config, restClient);
+        restServer.initializeServer();
+
+        URI advertisedUrl = restServer.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
+
+        ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
+                
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
+                config, ConnectorClientConfigOverridePolicy.class);
+
+        Herder herder = createHerder(config, workerId, plugins, 
connectorClientConfigOverridePolicy, restServer, restClient);
+
+        final Connect connect = new Connect(herder, restServer);
+        log.info("Kafka Connect worker initialization took {}ms", 
time.hiResClockMs() - initStart);
+        try {
+            connect.start();
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+            Exit.exit(3);
+        }
+
+        processExtraArgs(herder, connect, extraArgs);
+
+        return connect;
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index a786a172a29..1da9fa40622 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -16,15 +16,11 @@
  */
 package org.apache.kafka.connect.cli;
 
-import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -41,9 +37,6 @@ import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -51,65 +44,32 @@ import static 
org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect in distributed mode. In this 
mode, the process joints a group of other workers
- * and work is distributed among them. This is useful for running Connect as a 
service, where connectors can be
- * submitted to the cluster to be automatically executed in a scalable, 
distributed fashion. This also allows you to
- * easily scale out horizontally, elastically adding or removing capacity 
simply by starting or stopping worker
- * instances.
+ * Command line utility that runs Kafka Connect in distributed mode. In this 
mode, the process joins a group of other
+ * workers and work (connectors and tasks) is distributed among them. This is 
useful for running Connect as a service,
+ * where connectors can be submitted to the cluster to be automatically 
executed in a scalable, distributed fashion.
+ * This also allows you to easily scale out horizontally, elastically adding 
or removing capacity simply by starting or
+ * stopping worker instances.
  * </p>
  */
-public class ConnectDistributed {
+public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
     private static final Logger log = 
LoggerFactory.getLogger(ConnectDistributed.class);
 
-    private final Time time = Time.SYSTEM;
-    private final long initStart = time.hiResClockMs();
-
-    public static void main(String[] args) {
-
-        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
-            log.info("Usage: ConnectDistributed worker.properties");
-            Exit.exit(1);
-        }
-
-        try {
-            WorkerInfo initInfo = new WorkerInfo();
-            initInfo.logAll();
-
-            String workerPropsFile = args[0];
-            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.emptyMap();
-
-            ConnectDistributed connectDistributed = new ConnectDistributed();
-            Connect connect = connectDistributed.startConnect(workerProps);
-
-            // Shutdown will be triggered by Ctrl-C or via HTTP shutdown 
request
-            connect.awaitStop();
+    public ConnectDistributed(String... args) {
+        super(args);
+    }
 
-        } catch (Throwable t) {
-            log.error("Stopping due to error", t);
-            Exit.exit(2);
-        }
+    @Override
+    protected String usage() {
+        return "ConnectDistributed worker.properties";
     }
 
-    public Connect startConnect(Map<String, String> workerProps) {
-        log.info("Scanning for plugin classes. This might take a moment ...");
-        Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        DistributedConfig config = new DistributedConfig(workerProps);
+    @Override
+    protected Herder createHerder(DistributedConfig config, String workerId, 
Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient 
restClient) {
 
         String kafkaClusterId = config.kafkaClusterId();
-        log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
-        RestClient restClient = new RestClient(config);
-
-        RestServer rest = new RestServer(config, restClient);
-        rest.initializeServer();
-
-        URI advertisedUrl = rest.advertisedUrl();
-        String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
-
         String clientIdBase = ConnectUtils.clientIdBase(config);
-
         // Create the admin client to be shared by all backing stores.
         Map<String, Object> adminProps = new HashMap<>(config.originals());
         ConnectUtils.addMetricsContextProperties(adminProps, config, 
kafkaClusterId);
@@ -119,15 +79,11 @@ public class ConnectDistributed {
         KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);
         offsetBackingStore.configure(config);
 
-        ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
-                
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
-                config, ConnectorClientConfigOverridePolicy.class);
-
-        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin, 
clientIdBase);
+        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(Time.SYSTEM, internalValueConverter, sharedAdmin, 
clientIdBase);
         statusBackingStore.configure(config);
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
@@ -139,21 +95,18 @@ public class ConnectDistributed {
 
         // Pass the shared admin to the distributed herder as an additional 
AutoCloseable object that should be closed when the
         // herder is stopped. This is easier than having to track and own the 
lifecycle ourselves.
-        DistributedHerder herder = new DistributedHerder(config, time, worker,
+        return new DistributedHerder(config, Time.SYSTEM, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString(), restClient, 
connectorClientConfigOverridePolicy, sharedAdmin);
-
-        final Connect connect = new Connect(herder, rest);
-        log.info("Kafka Connect distributed worker initialization took {}ms", 
time.hiResClockMs() - initStart);
-        try {
-            connect.start();
-        } catch (Exception e) {
-            log.error("Failed to start Connect", e);
-            connect.stop();
-            Exit.exit(3);
-        }
+                restServer.advertisedUrl().toString(), restClient, 
connectorClientConfigOverridePolicy, sharedAdmin);
+    }
 
-        return connect;
+    @Override
+    protected DistributedConfig createConfig(Map<String, String> workerProps) {
+        return new DistributedConfig(workerProps);
     }
 
+    public static void main(String[] args) {
+        ConnectDistributed connectDistributed = new ConnectDistributed(args);
+        connectDistributed.run();
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 8b0f4cd2b58..49b59dc7e46 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -24,9 +24,8 @@ import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@@ -37,98 +36,75 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a 
single process. This is
- * useful for ad hoc, small, or experimental jobs.
+ * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a 
single process. This is useful for ad hoc,
+ * small, or experimental jobs.
  * </p>
  * <p>
- * By default, no job configs or offset data is persistent. You can make jobs 
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Connector and task configs are stored in memory and are not persistent. 
However, connector offset data is persistent
+ * since it uses file storage (configurable via {@link 
StandaloneConfig#OFFSET_STORAGE_FILE_FILENAME_CONFIG})
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = 
LoggerFactory.getLogger(ConnectStandalone.class);
 
-    public static void main(String[] args) {
+    protected ConnectStandalone(String... args) {
+        super(args);
+    }
 
-        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
-            log.info("Usage: ConnectStandalone worker.properties 
[connector1.properties connector2.properties ...]");
-            Exit.exit(1);
-        }
+    @Override
+    protected String usage() {
+        return "ConnectStandalone worker.properties [connector1.properties 
connector2.properties ...]";
+    }
 
+    @Override
+    protected void processExtraArgs(Herder herder, Connect connect, String[] 
extraArgs) {
         try {
-            Time time = Time.SYSTEM;
-            log.info("Kafka Connect standalone worker initializing ...");
-            long initStart = time.hiResClockMs();
-            WorkerInfo initInfo = new WorkerInfo();
-            initInfo.logAll();
-
-            String workerPropsFile = args[0];
-            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.emptyMap();
-
-            log.info("Scanning for plugin classes. This might take a moment 
...");
-            Plugins plugins = new Plugins(workerProps);
-            plugins.compareAndSwapWithDelegatingLoader();
-            StandaloneConfig config = new StandaloneConfig(workerProps);
-
-            String kafkaClusterId = config.kafkaClusterId();
-            log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
-            // Do not initialize a RestClient because the ConnectorsResource 
will not use it in standalone mode.
-            RestServer rest = new RestServer(config, null);
-            rest.initializeServer();
-
-            URI advertisedUrl = rest.advertisedUrl();
-            String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
+            for (final String connectorPropsFile : extraArgs) {
+                Map<String, String> connectorProps = 
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>((error, info) -> {
+                    if (error != null)
+                        log.error("Failed to create connector for {}", 
connectorPropsFile);
+                    else
+                        log.info("Created connector {}", info.result().name());
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping after connector error", t);
+            connect.stop();
+            Exit.exit(3);
+        }
+    }
 
-            OffsetBackingStore offsetBackingStore = new 
FileOffsetBackingStore();
-            offsetBackingStore.configure(config);
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, 
Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient 
restClient) {
 
-            ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
-                
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
-                config, ConnectorClientConfigOverridePolicy.class);
-            Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore,
-                                       connectorClientConfigOverridePolicy);
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
 
-            Herder herder = new StandaloneHerder(worker, kafkaClusterId, 
connectorClientConfigOverridePolicy);
-            final Connect connect = new Connect(herder, rest);
-            log.info("Kafka Connect standalone worker initialization took 
{}ms", time.hiResClockMs() - initStart);
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, 
offsetBackingStore,
+                connectorClientConfigOverridePolicy);
 
-            try {
-                connect.start();
-                for (final String connectorPropsFile : 
Arrays.copyOfRange(args, 1, args.length)) {
-                    Map<String, String> connectorProps = 
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
-                    FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>((error, info) -> {
-                        if (error != null)
-                            log.error("Failed to create job for {}", 
connectorPropsFile);
-                        else
-                            log.info("Created connector {}", 
info.result().name());
-                    });
-                    herder.putConnectorConfig(
-                            connectorProps.get(ConnectorConfig.NAME_CONFIG),
-                            connectorProps, false, cb);
-                    cb.get();
-                }
-            } catch (Throwable t) {
-                log.error("Stopping after connector error", t);
-                connect.stop();
-                Exit.exit(3);
-            }
+        return new StandaloneHerder(worker, config.kafkaClusterId(), 
connectorClientConfigOverridePolicy);
+    }
 
-            // Shutdown will be triggered by Ctrl-C or via HTTP shutdown 
request
-            connect.awaitStop();
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
 
-        } catch (Throwable t) {
-            log.error("Stopping due to error", t);
-            Exit.exit(2);
-        }
+    public static void main(String[] args) {
+        ConnectStandalone connectStandalone = new ConnectStandalone(args);
+        connectStandalone.run();
     }
 }

Reply via email to