sijie closed pull request #1745: Seperate cmd line connector interface to 
explicit source and sink
URL: https://github.com/apache/incubator-pulsar/pull/1745
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
 b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
similarity index 57%
rename from 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
rename to 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 0492d76f1f..01ec16ce67 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -27,6 +27,7 @@
 import com.google.gson.reflect.TypeToken;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -35,16 +36,12 @@
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.shaded.proto.Function;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.SinkConfig;
-import org.apache.pulsar.functions.utils.SourceConfig;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -56,32 +53,24 @@
 import java.util.Map;
 import java.util.function.Consumer;
 
-import net.jodah.typetools.TypeResolver;
-
 @Slf4j
 @Getter
-@Parameters(commandDescription = "Interface for managing Pulsar Connectors 
(Ingress and egress data to and from Pulsar)")
-public class CmdConnectors extends CmdBase {
+@Parameters(commandDescription = "Interface for managing Pulsar Sinks (Egress 
data from Pulsar)")
+public class CmdSinks extends CmdBase {
 
-    private final CreateSource createSource;
     private final CreateSink createSink;
-    private final DeleteConnector deleteConnector;
-    private final LocalSourceRunner localSourceRunner;
+    private final DeleteSink deleteSink;
     private final LocalSinkRunner localSinkRunner;
 
-    public CmdConnectors(PulsarAdmin admin) {
-        super("connectors", admin);
-        createSource = new CreateSource();
+    public CmdSinks(PulsarAdmin admin) {
+        super("sink", admin);
         createSink = new CreateSink();
-        deleteConnector = new DeleteConnector();
-        localSourceRunner = new LocalSourceRunner();
+        deleteSink = new DeleteSink();
         localSinkRunner = new LocalSinkRunner();
 
-        jcommander.addCommand("create-source", createSource);
-        jcommander.addCommand("create-sink", createSink);
-        jcommander.addCommand("delete", deleteConnector);
-        jcommander.addCommand("localrun-source", localSourceRunner);
-        jcommander.addCommand("localrun-sink", localSinkRunner);
+        jcommander.addCommand("create", createSink);
+        jcommander.addCommand("delete", deleteSink);
+        jcommander.addCommand("localrun", localSinkRunner);
     }
 
     /**
@@ -101,20 +90,7 @@ void processArguments() throws Exception {
         abstract void runCmd() throws Exception;
     }
 
-    @Parameters(commandDescription = "Run the Pulsar source or sink locally 
(rather than deploying it to the Pulsar cluster)")
-
-    class LocalSourceRunner extends CreateSource {
-
-        @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
-        protected String brokerServiceUrl;
-
-        @Override
-        void runCmd() throws Exception {
-            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
-                    sourceConfig.getParallelism(), brokerServiceUrl, jarFile, 
admin);
-        }
-    }
-
+    @Parameters(commandDescription = "Run the Pulsar sink locally (rather than 
deploying it to the Pulsar cluster)")
     class LocalSinkRunner extends CreateSink {
 
         @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
@@ -127,173 +103,6 @@ void runCmd() throws Exception {
         }
     }
 
-    @Parameters(commandDescription = "Create Pulsar source connectors")
-    class CreateSource extends BaseCommand {
-        @Parameter(names = "--tenant", description = "The source's tenant")
-        protected String tenant;
-        @Parameter(names = "--namespace", description = "The source's 
namespace")
-        protected String namespace;
-        @Parameter(names = "--name", description = "The source's name")
-        protected String name;
-        @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the Source")
-        protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--className", description = "The source's class 
name")
-        protected String className;
-        @Parameter(names = "--destinationTopicName", description = "Pulsar 
topic to ingress data to")
-        protected String destinationTopicName;
-        @Parameter(names = "--deserializationClassName", description = "The 
classname for SerDe class for the source")
-        protected String deserializationClassName;
-        @Parameter(names = "--parallelism", description = "Number of instances 
of the source")
-        protected String parallelism;
-        @Parameter(
-                names = "--jar",
-                description = "Path to the jar file for the Source",
-                listConverter = StringConverter.class)
-        protected String jarFile;
-
-        @Parameter(names = "--sourceConfigFile", description = "The path to a 
YAML config file specifying the "
-                + "source's configuration")
-        protected String sourceConfigFile;
-
-        protected SourceConfig sourceConfig;
-
-        @Override
-        void processArguments() throws Exception {
-            super.processArguments();
-
-            if (null != sourceConfigFile) {
-                this.sourceConfig = loadSourceConfig(sourceConfigFile);
-            } else {
-                this.sourceConfig = new SourceConfig();
-            }
-
-            if (null != tenant) {
-                sourceConfig.setTenant(tenant);
-            }
-            if (null != namespace) {
-                sourceConfig.setNamespace(namespace);
-            }
-            if (null != name) {
-                sourceConfig.setName(name);
-            }
-
-            if (null != className) {
-                this.sourceConfig.setClassName(className);
-            }
-            if (null != destinationTopicName) {
-                sourceConfig.setTopicName(destinationTopicName);
-            }
-            if (null != deserializationClassName) {
-                sourceConfig.setSerdeClassName(deserializationClassName);
-            }
-            if (null != processingGuarantees) {
-                sourceConfig.setProcessingGuarantees(processingGuarantees);
-            }
-            if (parallelism == null) {
-                if (sourceConfig.getParallelism() == 0) {
-                    sourceConfig.setParallelism(1);
-                }
-            } else {
-                int num = Integer.parseInt(parallelism);
-                if (num <= 0) {
-                    throw new IllegalArgumentException("The parallelism factor 
(the number of instances) for the "
-                            + "connector must be positive");
-                }
-                sourceConfig.setParallelism(num);
-            }
-
-            if (null == jarFile) {
-                throw new IllegalArgumentException("Connector JAR not 
specfied");
-            }
-        }
-
-        @Override
-        void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
-            admin.functions().createFunction(createSourceConfig(sourceConfig), 
jarFile);
-            print("Created successfully");
-        }
-
-        private Class<?> getSourceType(File file) {
-            if (!Reflections.classExistsInJar(file, 
sourceConfig.getClassName())) {
-                throw new IllegalArgumentException(String.format("Pulsar 
Source class %s does not exist in jar %s",
-                        sourceConfig.getClassName(), jarFile));
-            } else if (!Reflections.classInJarImplementsIface(file, 
sourceConfig.getClassName(), Source.class)) {
-                throw new IllegalArgumentException(String.format("The Pulsar 
source class %s in jar %s implements does not implement " + 
Source.class.getName(),
-                        sourceConfig.getClassName(), jarFile));
-            }
-
-            Object userClass = 
Reflections.createInstance(sourceConfig.getClassName(), file);
-            Class<?> typeArg;
-            Source source = (Source) userClass;
-            if (source == null) {
-                throw new IllegalArgumentException(String.format("The Pulsar 
source class %s could not be instantiated from jar %s",
-                        sourceConfig.getClassName(), jarFile));
-            }
-            typeArg = TypeResolver.resolveRawArgument(Source.class, 
source.getClass());
-
-            return typeArg;
-        }
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails 
createSourceConfigProto2(SourceConfig sourceConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder 
functionDetailsBuilder
-                    = 
org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            
Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), 
functionDetailsBuilder);
-            return functionDetailsBuilder.build();
-        }
-
-        protected FunctionDetails createSourceConfig(SourceConfig 
sourceConfig) {
-
-            File file = new File(jarFile);
-            try {
-                Reflections.loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new RuntimeException("Failed to load user jar " + file, 
e);
-            }
-            Class<?> typeArg = getSourceType(file);
-
-            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
-            if (sourceConfig.getTenant() != null) {
-                functionDetailsBuilder.setTenant(sourceConfig.getTenant());
-            }
-            if (sourceConfig.getNamespace() != null) {
-                
functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
-            }
-            if (sourceConfig.getName() != null) {
-                functionDetailsBuilder.setName(sourceConfig.getName());
-            }
-            functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-            
functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
-            
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
-            if (sourceConfig.getProcessingGuarantees() != null) {
-                functionDetailsBuilder.setProcessingGuarantees(
-                        
convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
-            }
-
-            // set source spec
-            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            sourceSpecBuilder.setClassName(sourceConfig.getClassName());
-            sourceSpecBuilder.setConfigs(new 
Gson().toJson(sourceConfig.getConfigs()));
-            sourceSpecBuilder.setTypeClassName(typeArg.getName());
-            functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-            // set up sink spec
-            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            sinkSpecBuilder.setClassName(PulsarSink.class.getName());
-            if (sourceConfig.getSerdeClassName() != null && 
!sourceConfig.getSerdeClassName().isEmpty()) {
-                
sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
-            }
-            sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
-            sinkSpecBuilder.setTypeClassName(typeArg.getName());
-
-            functionDetailsBuilder.setSink(sinkSpecBuilder);
-            return functionDetailsBuilder.build();
-        }
-    }
-
     @Parameters(commandDescription = "Create Pulsar sink connectors")
     class CreateSink extends BaseCommand {
         @Parameter(names = "--tenant", description = "The sink's tenant")
@@ -356,7 +165,7 @@ void processArguments() throws Exception {
                 inputTopics.forEach(new Consumer<String>() {
                     @Override
                     public void accept(String s) {
-                        CmdConnectors.validateTopicName(s);
+                        CmdSinks.validateTopicName(s);
                         topicsToSerDeClassName.put(s, "");
                     }
                 });
@@ -365,7 +174,7 @@ public void accept(String s) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, String> customSerdeInputMap = new 
Gson().fromJson(customSerdeInputString, type);
                 customSerdeInputMap.forEach((topic, serde) -> {
-                    CmdConnectors.validateTopicName(topic);
+                    CmdSinks.validateTopicName(topic);
                     topicsToSerDeClassName.put(topic, serde);
                 });
             }
@@ -475,15 +284,15 @@ protected FunctionDetails createSinkConfig(SinkConfig 
sinkConfig) {
     }
 
     @Parameters(commandDescription = "Stops a Pulsar sink or source")
-    class DeleteConnector extends BaseCommand {
+    class DeleteSink extends BaseCommand {
 
-        @Parameter(names = "--tenant", description = "The tenant of a sink or 
source")
+        @Parameter(names = "--tenant", description = "The tenant of the sink")
         protected String tenant;
 
-        @Parameter(names = "--namespace", description = "The namespace of a 
sink or source")
+        @Parameter(names = "--namespace", description = "The namespace of the 
sink")
         protected String namespace;
 
-        @Parameter(names = "--name", description = "The name of a sink or 
source")
+        @Parameter(names = "--name", description = "The name of the sink")
         protected String name;
 
         @Override
@@ -491,7 +300,7 @@ void processArguments() throws Exception {
             super.processArguments();
             if (null == tenant || null == namespace || null == name) {
                 throw new RuntimeException(
-                        "You must specify a tenant, namespace, and name for 
the sink or source");
+                        "You must specify a tenant, namespace, and name for 
the sink");
             }
         }
 
@@ -506,25 +315,11 @@ private static SinkConfig loadSinkConfig(String file) 
throws IOException {
         return (SinkConfig) loadConfig(file, SinkConfig.class);
     }
 
-    private static SourceConfig loadSourceConfig(String file) throws 
IOException {
-        return (SourceConfig) loadConfig(file, SourceConfig.class);
-    }
-
     private static Object loadConfig(String file, Class<?> clazz) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(file), clazz);
     }
 
-    public static boolean areAllRequiredFieldsPresentForSource(SourceConfig 
sourceConfig) {
-        return sourceConfig.getTenant() != null && 
!sourceConfig.getTenant().isEmpty()
-                && sourceConfig.getNamespace() != null && 
!sourceConfig.getNamespace().isEmpty()
-                && sourceConfig.getName() != null && 
!sourceConfig.getName().isEmpty()
-                && sourceConfig.getClassName() != null && 
!sourceConfig.getClassName().isEmpty()
-                && sourceConfig.getTopicName() != null && 
!sourceConfig.getTopicName().isEmpty()
-                || sourceConfig.getSerdeClassName() != null
-                && sourceConfig.getParallelism() > 0;
-    }
-
     public static boolean areAllRequiredFieldsPresentForSink(SinkConfig 
sinkConfig) {
         return sinkConfig.getTenant() != null && 
!sinkConfig.getTenant().isEmpty()
                 && sinkConfig.getNamespace() != null && 
!sinkConfig.getNamespace().isEmpty()
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
new file mode 100644
index 0000000000..c651e373b1
--- /dev/null
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.FunctionsImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.connect.core.Sink;
+import org.apache.pulsar.connect.core.Source;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.shaded.proto.Function;
+import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
+import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.utils.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+@Slf4j
+@Getter
+@Parameters(commandDescription = "Interface for managing Pulsar Source 
(Ingress data to Pulsar)")
+public class CmdSources extends CmdBase {
+
+    private final CreateSource createSource;
+    private final DeleteSource deleteSource;
+    private final LocalSourceRunner localSourceRunner;
+
+    public CmdSources(PulsarAdmin admin) {
+        super("source", admin);
+        createSource = new CreateSource();
+        deleteSource = new DeleteSource();
+        localSourceRunner = new LocalSourceRunner();
+
+        jcommander.addCommand("create", createSource);
+        jcommander.addCommand("delete", deleteSource);
+        jcommander.addCommand("localrun", localSourceRunner);
+    }
+
+    /**
+     * Base command
+     */
+    @Getter
+    abstract class BaseCommand extends CliCommand {
+        @Override
+        void run() throws Exception {
+            processArguments();
+            runCmd();
+        }
+
+        void processArguments() throws Exception {
+        }
+
+        abstract void runCmd() throws Exception;
+    }
+
+    @Parameters(commandDescription = "Run the Pulsar source locally (rather 
than deploying it to the Pulsar cluster)")
+    class LocalSourceRunner extends CreateSource {
+
+        @Parameter(names = "--brokerServiceUrl", description = "The URL for 
the Pulsar broker")
+        protected String brokerServiceUrl;
+
+        @Override
+        void runCmd() throws Exception {
+            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
+                    sourceConfig.getParallelism(), brokerServiceUrl, jarFile, 
admin);
+        }
+    }
+
+    @Parameters(commandDescription = "Create Pulsar source connectors")
+    class CreateSource extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The source's tenant")
+        protected String tenant;
+        @Parameter(names = "--namespace", description = "The source's 
namespace")
+        protected String namespace;
+        @Parameter(names = "--name", description = "The source's name")
+        protected String name;
+        @Parameter(names = "--processingGuarantees", description = "The 
processing guarantees (aka delivery semantics) applied to the Source")
+        protected FunctionConfig.ProcessingGuarantees processingGuarantees;
+        @Parameter(names = "--className", description = "The source's class 
name")
+        protected String className;
+        @Parameter(names = "--destinationTopicName", description = "Pulsar 
topic to ingress data to")
+        protected String destinationTopicName;
+        @Parameter(names = "--deserializationClassName", description = "The 
classname for SerDe class for the source")
+        protected String deserializationClassName;
+        @Parameter(names = "--parallelism", description = "Number of instances 
of the source")
+        protected String parallelism;
+        @Parameter(
+                names = "--jar",
+                description = "Path to the jar file for the Source",
+                listConverter = StringConverter.class)
+        protected String jarFile;
+
+        @Parameter(names = "--sourceConfigFile", description = "The path to a 
YAML config file specifying the "
+                + "source's configuration")
+        protected String sourceConfigFile;
+
+        protected SourceConfig sourceConfig;
+
+        @Override
+        void processArguments() throws Exception {
+            super.processArguments();
+
+            if (null != sourceConfigFile) {
+                this.sourceConfig = loadSourceConfig(sourceConfigFile);
+            } else {
+                this.sourceConfig = new SourceConfig();
+            }
+
+            if (null != tenant) {
+                sourceConfig.setTenant(tenant);
+            }
+            if (null != namespace) {
+                sourceConfig.setNamespace(namespace);
+            }
+            if (null != name) {
+                sourceConfig.setName(name);
+            }
+
+            if (null != className) {
+                this.sourceConfig.setClassName(className);
+            }
+            if (null != destinationTopicName) {
+                sourceConfig.setTopicName(destinationTopicName);
+            }
+            if (null != deserializationClassName) {
+                sourceConfig.setSerdeClassName(deserializationClassName);
+            }
+            if (null != processingGuarantees) {
+                sourceConfig.setProcessingGuarantees(processingGuarantees);
+            }
+            if (parallelism == null) {
+                if (sourceConfig.getParallelism() == 0) {
+                    sourceConfig.setParallelism(1);
+                }
+            } else {
+                int num = Integer.parseInt(parallelism);
+                if (num <= 0) {
+                    throw new IllegalArgumentException("The parallelism factor 
(the number of instances) for the "
+                            + "connector must be positive");
+                }
+                sourceConfig.setParallelism(num);
+            }
+
+            if (null == jarFile) {
+                throw new IllegalArgumentException("Connector JAR not 
specfied");
+            }
+        }
+
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().createFunction(createSourceConfig(sourceConfig), 
jarFile);
+            print("Created successfully");
+        }
+
+        private Class<?> getSourceType(File file) {
+            if (!Reflections.classExistsInJar(file, 
sourceConfig.getClassName())) {
+                throw new IllegalArgumentException(String.format("Pulsar 
Source class %s does not exist in jar %s",
+                        sourceConfig.getClassName(), jarFile));
+            } else if (!Reflections.classInJarImplementsIface(file, 
sourceConfig.getClassName(), Source.class)) {
+                throw new IllegalArgumentException(String.format("The Pulsar 
source class %s in jar %s implements does not implement " + 
Source.class.getName(),
+                        sourceConfig.getClassName(), jarFile));
+            }
+
+            Object userClass = 
Reflections.createInstance(sourceConfig.getClassName(), file);
+            Class<?> typeArg;
+            Source source = (Source) userClass;
+            if (source == null) {
+                throw new IllegalArgumentException(String.format("The Pulsar 
source class %s could not be instantiated from jar %s",
+                        sourceConfig.getClassName(), jarFile));
+            }
+            typeArg = TypeResolver.resolveRawArgument(Source.class, 
source.getClass());
+
+            return typeArg;
+        }
+
+        protected org.apache.pulsar.functions.proto.Function.FunctionDetails 
createSourceConfigProto2(SourceConfig sourceConfig)
+                throws IOException {
+            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder 
functionDetailsBuilder
+                    = 
org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
+            
Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), 
functionDetailsBuilder);
+            return functionDetailsBuilder.build();
+        }
+
+        protected FunctionDetails createSourceConfig(SourceConfig 
sourceConfig) {
+
+            File file = new File(jarFile);
+            try {
+                Reflections.loadJar(file);
+            } catch (MalformedURLException e) {
+                throw new RuntimeException("Failed to load user jar " + file, 
e);
+            }
+            Class<?> typeArg = getSourceType(file);
+
+            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+            if (sourceConfig.getTenant() != null) {
+                functionDetailsBuilder.setTenant(sourceConfig.getTenant());
+            }
+            if (sourceConfig.getNamespace() != null) {
+                
functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
+            }
+            if (sourceConfig.getName() != null) {
+                functionDetailsBuilder.setName(sourceConfig.getName());
+            }
+            functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+            
functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
+            
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+            if (sourceConfig.getProcessingGuarantees() != null) {
+                functionDetailsBuilder.setProcessingGuarantees(
+                        
convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
+            }
+
+            // set source spec
+            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+            sourceSpecBuilder.setClassName(sourceConfig.getClassName());
+            sourceSpecBuilder.setConfigs(new 
Gson().toJson(sourceConfig.getConfigs()));
+            sourceSpecBuilder.setTypeClassName(typeArg.getName());
+            functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+            // set up sink spec
+            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+            sinkSpecBuilder.setClassName(PulsarSink.class.getName());
+            if (sourceConfig.getSerdeClassName() != null && 
!sourceConfig.getSerdeClassName().isEmpty()) {
+                
sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
+            }
+            sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
+            sinkSpecBuilder.setTypeClassName(typeArg.getName());
+
+            functionDetailsBuilder.setSink(sinkSpecBuilder);
+            return functionDetailsBuilder.build();
+        }
+    }
+
+    @Parameters(commandDescription = "Stops a Pulsar source")
+    class DeleteSource extends BaseCommand {
+
+        @Parameter(names = "--tenant", description = "The tenant of a sink or 
source")
+        protected String tenant;
+
+        @Parameter(names = "--namespace", description = "The namespace of a 
sink or source")
+        protected String namespace;
+
+        @Parameter(names = "--name", description = "The name of a sink or 
source")
+        protected String name;
+
+        @Override
+        void processArguments() throws Exception {
+            super.processArguments();
+            if (null == tenant || null == namespace || null == name) {
+                throw new RuntimeException(
+                        "You must specify a tenant, namespace, and name for 
the source");
+            }
+        }
+
+        @Override
+        void runCmd() throws Exception {
+            admin.functions().deleteFunction(tenant, namespace, name);
+            print("Delete source successfully");
+        }
+    }
+
+    private static SourceConfig loadSourceConfig(String file) throws 
IOException {
+        return (SourceConfig) loadConfig(file, SourceConfig.class);
+    }
+
+    private static Object loadConfig(String file, Class<?> clazz) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(file), clazz);
+    }
+
+    public static boolean areAllRequiredFieldsPresentForSource(SourceConfig 
sourceConfig) {
+        return sourceConfig.getTenant() != null && 
!sourceConfig.getTenant().isEmpty()
+                && sourceConfig.getNamespace() != null && 
!sourceConfig.getNamespace().isEmpty()
+                && sourceConfig.getName() != null && 
!sourceConfig.getName().isEmpty()
+                && sourceConfig.getClassName() != null && 
!sourceConfig.getClassName().isEmpty()
+                && sourceConfig.getTopicName() != null && 
!sourceConfig.getTopicName().isEmpty()
+                || sourceConfig.getSerdeClassName() != null
+                && sourceConfig.getParallelism() > 0;
+    }
+
+    private static ProcessingGuarantees convertProcessingGuarantee(
+            FunctionConfig.ProcessingGuarantees processingGuarantees) {
+        for (ProcessingGuarantees type : ProcessingGuarantees.values()) {
+            if (type.name().equals(processingGuarantees.name())) {
+                return type;
+            }
+        }
+        throw new RuntimeException("Unrecognized processing guarantee: " + 
processingGuarantees.name());
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index c0ee2cb1b8..6467072c28 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -96,7 +96,8 @@
 
         commandMap.put("resource-quotas", CmdResourceQuotas.class);
         commandMap.put("functions", CmdFunctions.class);
-        commandMap.put("connectors", CmdConnectors.class);
+        commandMap.put("source", CmdSources.class);
+        commandMap.put("sink", CmdSinks.class);
     }
 
     private void setupCommands(Function<PulsarAdminBuilder, ? extends 
PulsarAdmin> adminFactory) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to