C0urante commented on code in PR #12947:
URL: https://github.com/apache/kafka/pull/12947#discussion_r1048932674


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java:
##########
@@ -37,98 +36,75 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Map;
 
 /**
  * <p>
- * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work is not
- * distributed. Instead, all the normal Connect machinery works within a 
single process. This is
- * useful for ad hoc, small, or experimental jobs.
+ * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work (connectors and tasks) is not
+ * distributed. Instead, all the normal Connect machinery works within a 
single process. This is useful for ad hoc,
+ * small, or experimental jobs.
  * </p>
  * <p>
- * By default, no job configs or offset data is persistent. You can make jobs 
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
+ * Connector and task configs are stored in memory and are not persistent. 
However, connector offset data is persistent
+ * since it uses file storage (configurable via {@link 
StandaloneConfig#OFFSET_STORAGE_FILE_FILENAME_CONFIG})
  * </p>
  */
-public class ConnectStandalone {
+public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
     private static final Logger log = 
LoggerFactory.getLogger(ConnectStandalone.class);
 
-    public static void main(String[] args) {
+    protected ConnectStandalone(String... args) {
+        super(args);
+    }
 
-        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
-            log.info("Usage: ConnectStandalone worker.properties 
[connector1.properties connector2.properties ...]");
-            Exit.exit(1);
-        }
+    @Override
+    protected String usage() {
+        return "ConnectStandalone worker.properties [connector1.properties 
connector2.properties ...]";
+    }
 
+    @Override
+    protected void processExtraArgs(Herder herder, Connect connect, String[] 
extraArgs) {
         try {
-            Time time = Time.SYSTEM;
-            log.info("Kafka Connect standalone worker initializing ...");
-            long initStart = time.hiResClockMs();
-            WorkerInfo initInfo = new WorkerInfo();
-            initInfo.logAll();
-
-            String workerPropsFile = args[0];
-            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.emptyMap();
-
-            log.info("Scanning for plugin classes. This might take a moment 
...");
-            Plugins plugins = new Plugins(workerProps);
-            plugins.compareAndSwapWithDelegatingLoader();
-            StandaloneConfig config = new StandaloneConfig(workerProps);
-
-            String kafkaClusterId = config.kafkaClusterId();
-            log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
-            // Do not initialize a RestClient because the ConnectorsResource 
will not use it in standalone mode.
-            RestServer rest = new RestServer(config, null);
-            rest.initializeServer();
-
-            URI advertisedUrl = rest.advertisedUrl();
-            String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
+            for (final String connectorPropsFile : extraArgs) {
+                Map<String, String> connectorProps = 
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>((error, info) -> {
+                    if (error != null)
+                        log.error("Failed to create connector for {}", 
connectorPropsFile);
+                    else
+                        log.info("Created connector {}", info.result().name());
+                });
+                herder.putConnectorConfig(
+                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                        connectorProps, false, cb);
+                cb.get();
+            }
+        } catch (Throwable t) {
+            log.error("Stopping Connect due to an error while attempting to 
create a connector", t);

Review Comment:
   Can we please keep [the existing 
wording](https://github.com/apache/kafka/blob/de088a2e9758e36efe60b1d8acb18b4881b5a9fc/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L121)
 for this error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to