C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1044687594


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -21,41 +21,51 @@
 import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a 
single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs 
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a 
single process. This is useful for for development
+ * and testing Kafka Connect on a local machine.
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = 
LoggerFactory.getLogger(ConnectStandalone.class);
 
+    @Override
+    protected Herder createHerder(StandaloneConfig config, String workerId, 
Plugins plugins,
+                                  ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+                                  RestServer restServer, RestClient 
restClient) {
+
+        OffsetBackingStore offsetBackingStore = new FileOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, 
offsetBackingStore,
+                connectorClientConfigOverridePolicy);
+
+        return new StandaloneHerder(worker, config.kafkaClusterId(), 
connectorClientConfigOverridePolicy);
+    }
+
+    @Override
+    protected StandaloneConfig createConfig(Map<String, String> workerProps) {
+        return new StandaloneConfig(workerProps);
+    }
+
     public static void main(String[] args) {

Review Comment:
   I think if we're going to call this an abstract CLI, we should do the actual 
command-line argument processing in this class.
   
   One possible approach is to restructure the `AbstractConnectCli` class to 
have a protected constructor that accepts the command-line arguments:
   ```java
   abstract class AbstractConnectCli {
       private final String[] args;
   
       protected AbstractConnectCli(String[] args) {
           this.args = args;
       }
   }
   ```
   
   Expose a `run` method that processes those arguments, with some hook-in 
logic for subclasses to define custom usage info and handle extra command-line 
arguments:
   
   ```java
   protected abstract String usage();
   
   protected abstract void processExtraArgs(Herder herder, String[] extraArgs) 
throws Throwable;
   
   public void run() {
       if (args.length < 1 || Arrays.asList(args).contains("--help")) {
           log.info("Usage: {}", usage());
           Exit.exit(1);
       }
   
       try {
           String workerPropsFile = args[0];
           Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                   Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.emptyMap();
           String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
           Connect connect = startConnect(workerProps, extraArgs);
   
           // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
           connect.awaitStop();
   
       } catch (Throwable t) {
           log.error("Stopping due to error", t);
           Exit.exit(2);
       }
   }
   ```
   
   And tweak `startConnect` to invoke the extra args hook after it's created 
and started the `Connect` instance:
   
   ```java
   // This part is unchanged; only added for context
   try {
       connect.start();
   } catch (Exception e) {
       log.error("Failed to start Connect", e);
       connect.stop();
       Exit.exit(3);
   }
   
   try {
       // This is the noteworthy change, which replaces the current logic for 
starting connectors based off of command-line properties files
       processExtraArgs(herder, extraArgs);
   } catch (Throwable t) {
       connect.stop();
       Exit.exit(3);
   }
   
   // This part is unchanged; only added for context
   return connect;
   ```
   
   This may be complicated by usage in other places (like the integration 
testing framework), but hopefully you get the idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to