This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f8577658f940a582ee59d4e2467f1465a2655de1
Author: Sanjeev Kulkarni <sanj...@streaml.io>
AuthorDate: Fri Oct 5 16:23:50 2018 -0700

    Plugged in source/sink
---
 .../pulsar/functions/utils/SinkConfigUtils.java    |  9 +--
 .../pulsar/functions/utils/SourceConfigUtils.java  | 12 +--
 .../pulsar/functions/utils/io/ConnectorUtils.java  | 84 ++++++++++-----------
 .../functions/utils/validation/ValidatorImpls.java | 85 +++++++++++-----------
 .../org/apache/pulsar/functions/worker/Utils.java  | 28 +++++++
 .../functions/worker/rest/api/FunctionsImpl.java   | 60 +++++++++++++--
 .../worker/rest/api/v2/SinkApiV2Resource.java      |  4 +-
 .../worker/rest/api/v2/SourceApiV2Resource.java    | 14 ++--
 8 files changed, 179 insertions(+), 117 deletions(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index d44ea30..95803ab 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -37,7 +37,7 @@ import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee
 
 public class SinkConfigUtils {
 
-    public static FunctionDetails convert(SinkConfig sinkConfig) throws 
IOException {
+    public static FunctionDetails convert(SinkConfig sinkConfig, 
NarClassLoader classLoader) throws IOException {
 
         String sinkClassName = null;
         String typeArg = null;
@@ -53,11 +53,8 @@ public class SinkConfigUtils {
                 }
                 sinkClassName = sinkConfig.getClassName(); // server derives 
the arg-type by loading a class
             } else {
-                sinkClassName = 
ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
-                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sinkConfig.getArchive()),
-                        Collections.emptySet())) {
-                    typeArg = Utils.getSinkType(sinkClassName, ncl).getName();
-                }
+                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+                typeArg = Utils.getSinkType(sinkClassName, 
classLoader).getName();
             }
         }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 73b331a..a132c8a 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -26,16 +26,14 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 
 import static 
org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 
 public class SourceConfigUtils {
 
-    public static FunctionDetails convert(SourceConfig sourceConfig)
+    public static FunctionDetails convert(SourceConfig sourceConfig, 
NarClassLoader classLoader)
             throws IllegalArgumentException, IOException {
 
         String sourceClassName = null;
@@ -52,12 +50,8 @@ public class SourceConfigUtils {
                 }
                 sourceClassName = sourceConfig.getClassName(); // server 
derives the arg-type by loading a class
             } else {
-                sourceClassName = 
ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
-
-                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(sourceConfig.getArchive()),
-                        Collections.emptySet())) {
-                    typeArg = getSourceType(sourceClassName, ncl).getName();
-                }
+                sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
+                typeArg = getSourceType(sourceClassName, 
classLoader).getName();
             }
         }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 915df05..c6feb5a 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -46,59 +46,57 @@ public class ConnectorUtils {
     /**
      * Extract the Pulsar IO Source class from a connector archive.
      */
-    public static String getIOSourceClass(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(narPath), Collections.emptySet())) {
-            String configStr = 
ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
-            ConnectorDefinition conf = 
ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
-                    ConnectorDefinition.class);
-            if (StringUtils.isEmpty(conf.getSourceClass())) {
-                throw new IOException(
-                        String.format("The '%s' connector does not provide a 
source implementation", conf.getName()));
-            }
+    public static String getIOSourceClass(ClassLoader classLoader) throws 
IOException {
+        NarClassLoader ncl = (NarClassLoader) classLoader;
+        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+        ConnectorDefinition conf = 
ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+                ConnectorDefinition.class);
+        if (StringUtils.isEmpty(conf.getSourceClass())) {
+            throw new IOException(
+                    String.format("The '%s' connector does not provide a 
source implementation", conf.getName()));
+        }
 
-            try {
-                // Try to load source class and check it implements Source 
interface
-                Object instance = 
ncl.loadClass(conf.getSourceClass()).newInstance();
-                if (!(instance instanceof Source)) {
-                    throw new IOException("Class " + conf.getSourceClass() + " 
does not implement interface "
-                            + Source.class.getName());
-                }
-            } catch (Throwable t) {
-                Exceptions.rethrowIOException(t);
+        try {
+            // Try to load source class and check it implements Source 
interface
+            Object instance = 
ncl.loadClass(conf.getSourceClass()).newInstance();
+            if (!(instance instanceof Source)) {
+                throw new IOException("Class " + conf.getSourceClass() + " 
does not implement interface "
+                        + Source.class.getName());
             }
-
-            return conf.getSourceClass();
+        } catch (Throwable t) {
+            Exceptions.rethrowIOException(t);
         }
+
+        return conf.getSourceClass();
     }
 
     /**
      * Extract the Pulsar IO Sink class from a connector archive.
      */
-    public static String getIOSinkClass(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(narPath), Collections.emptySet())) {
-            String configStr = 
ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+    public static String getIOSinkClass(ClassLoader classLoader) throws 
IOException {
+        NarClassLoader ncl = (NarClassLoader) classLoader;
+        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+        ConnectorDefinition conf = 
ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+                ConnectorDefinition.class);
+        if (StringUtils.isEmpty(conf.getSinkClass())) {
+            throw new IOException(
+                    String.format("The '%s' connector does not provide a sink 
implementation", conf.getName()));
+        }
 
-            ConnectorDefinition conf = 
ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
-                    ConnectorDefinition.class);
-            if (StringUtils.isEmpty(conf.getSinkClass())) {
+        try {
+            // Try to load source class and check it implements Sink interface
+            Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
+            if (!(instance instanceof Sink)) {
                 throw new IOException(
-                        String.format("The '%s' connector does not provide a 
sink implementation", conf.getName()));
-            }
-
-            try {
-                // Try to load source class and check it implements Sink 
interface
-                Object instance = 
ncl.loadClass(conf.getSinkClass()).newInstance();
-                if (!(instance instanceof Sink)) {
-                    throw new IOException(
-                            "Class " + conf.getSinkClass() + " does not 
implement interface " + Sink.class.getName());
-                }
-            } catch (Throwable t) {
-                Exceptions.rethrowIOException(t);
+                        "Class " + conf.getSinkClass() + " does not implement 
interface " + Sink.class.getName());
             }
-
-            return conf.getSinkClass();
+        } catch (Throwable t) {
+            Exceptions.rethrowIOException(t);
         }
+
+        return conf.getSinkClass();
     }
 
     public static ConnectorDefinition getConnectorDefinition(String narPath) 
throws IOException {
@@ -127,14 +125,10 @@ public class ConnectorUtils {
                     log.info("Found connector {} from {}", cntDef, archive);
 
                     if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        // Validate source class to be present and of the 
right type
-                        ConnectorUtils.getIOSourceClass(archive.toString());
                         connectors.sources.put(cntDef.getName(), archive);
                     }
 
                     if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        // Validate sinkclass to be present and of the right 
type
-                        ConnectorUtils.getIOSinkClass(archive.toString());
                         connectors.sinks.put(cntDef.getName(), archive);
                     }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index dba6bd9..e600afe 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -727,14 +727,15 @@ public class ValidatorImpls {
         @Override
         public void validateField(String name, Object o, ClassLoader 
classLoader) {
             SourceConfig sourceConfig = (SourceConfig) o;
-            if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) {
-                // We don't have to check the archive, since it's provided on 
the worker itself
+            if (classLoader == null) {
+                // This happens at the cli for builtin. There is no need to 
check this since
+                // the actual check will be done at serverside
                 return;
             }
 
             String sourceClassName;
             try {
-                sourceClassName = 
ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
+                sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
             } catch (IOException e1) {
                 throw new IllegalArgumentException("Failed to extract source 
class from archive", e1);
             }
@@ -743,15 +744,14 @@ public class ValidatorImpls {
             Class<?> typeArg = getSourceType(sourceClassName, classLoader);
 
             // Only one of serdeClassName or schemaType should be set
-            if (sourceConfig.getSerdeClassName() != null && 
!sourceConfig.getSerdeClassName().isEmpty()
-                    && sourceConfig.getSchemaType() != null && 
!sourceConfig.getSchemaType().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && 
!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
                 throw new IllegalArgumentException("Only one of serdeClassName 
or schemaType should be set");
             }
 
-            if (sourceConfig.getSerdeClassName() != null && 
!sourceConfig.getSerdeClassName().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
                 
FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, 
name, classLoader, false);
             }
-            if (sourceConfig.getSchemaType() != null && 
!sourceConfig.getSchemaType().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
                 
FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, 
name, classLoader, false);
             }
         }
@@ -761,8 +761,9 @@ public class ValidatorImpls {
         @Override
         public void validateField(String name, Object o, ClassLoader 
classLoader) {
             SinkConfig sinkConfig = (SinkConfig) o;
-            if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) {
-                // We don't have to check the archive, since it's provided on 
the worker itself
+            if (classLoader == null) {
+                // This happens at the cli for builtin. There is no need to 
check this since
+                // the actual check will be done at serverside
                 return;
             }
 
@@ -779,42 +780,42 @@ public class ValidatorImpls {
             }
 
 
-            try (NarClassLoader clsLoader = NarClassLoader.getFromArchive(new 
File(sinkConfig.getArchive()),
-                    Collections.emptySet())) {
-                String sinkClassName = 
ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
-                Class<?> typeArg = getSinkType(sinkClassName, clsLoader);
+            String sinkClassName;
+            try {
+                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+            } catch (IOException e1) {
+                throw new IllegalArgumentException("Failed to extract sink 
class from archive", e1);
+            }
+            Class<?> typeArg = getSinkType(sinkClassName, classLoader);
 
-                if (sinkConfig.getTopicToSerdeClassName() != null) {
-                    sinkConfig.getTopicToSerdeClassName().forEach((topicName, 
serdeClassName) -> {
-                        FunctionConfigValidator.validateSerde(serdeClassName, 
typeArg, name, clsLoader, true);
-                    });
-                }
+            if (sinkConfig.getTopicToSerdeClassName() != null) {
+                sinkConfig.getTopicToSerdeClassName().forEach((topicName, 
serdeClassName) -> {
+                    FunctionConfigValidator.validateSerde(serdeClassName, 
typeArg, name, classLoader, true);
+                });
+            }
 
-                if (sinkConfig.getTopicToSchemaType() != null) {
-                    sinkConfig.getTopicToSchemaType().forEach((topicName, 
schemaType) -> {
-                        FunctionConfigValidator.validateSchema(schemaType, 
typeArg, name, clsLoader, true);
-                    });
-                }
+            if (sinkConfig.getTopicToSchemaType() != null) {
+                sinkConfig.getTopicToSchemaType().forEach((topicName, 
schemaType) -> {
+                    FunctionConfigValidator.validateSchema(schemaType, 
typeArg, name, classLoader, true);
+                });
+            }
 
-                // topicsPattern does not need checks
-
-                if (sinkConfig.getInputSpecs() != null) {
-                    sinkConfig.getInputSpecs().forEach((topicName, 
consumerSpec) -> {
-                        // Only one is set
-                        if (consumerSpec.getSerdeClassName() != null && 
!consumerSpec.getSerdeClassName().isEmpty()
-                                && consumerSpec.getSchemaType() != null && 
!consumerSpec.getSchemaType().isEmpty()) {
-                            throw new IllegalArgumentException("Only one of 
serdeClassName or schemaType should be set");
-                        }
-                        if (consumerSpec.getSerdeClassName() != null && 
!consumerSpec.getSerdeClassName().isEmpty()) {
-                            
FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), 
typeArg, name, clsLoader, true);
-                        }
-                        if (consumerSpec.getSchemaType() != null && 
!consumerSpec.getSchemaType().isEmpty()) {
-                            
FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, 
name, clsLoader, true);
-                        }
-                    });
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException(e.getMessage());
+            // topicsPattern does not need checks
+
+            if (sinkConfig.getInputSpecs() != null) {
+                sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) 
-> {
+                    // Only one is set
+                    if (consumerSpec.getSerdeClassName() != null && 
!consumerSpec.getSerdeClassName().isEmpty()
+                            && consumerSpec.getSchemaType() != null && 
!consumerSpec.getSchemaType().isEmpty()) {
+                        throw new IllegalArgumentException("Only one of 
serdeClassName or schemaType should be set");
+                    }
+                    if (consumerSpec.getSerdeClassName() != null && 
!consumerSpec.getSerdeClassName().isEmpty()) {
+                        
FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), 
typeArg, name, classLoader, true);
+                    }
+                    if (consumerSpec.getSchemaType() != null && 
!consumerSpec.getSchemaType().isEmpty()) {
+                        
FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, 
name, classLoader, true);
+                    }
+                });
             }
         }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index aa80403..f935991 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -33,6 +33,7 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
 import java.util.UUID;
 import lombok.extern.slf4j.Slf4j;
 
@@ -47,6 +48,7 @@ import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
@@ -163,6 +165,32 @@ public final class Utils {
             throw new IllegalArgumentException("Unsupported url protocol "+ 
destPkgUrl +", supported url protocols: [file/http/https]");
         }
     }
+
+    public static NarClassLoader extractNarClassloader(String destPkgUrl, 
String downloadPkgDir) throws IOException, URISyntaxException {
+        if (destPkgUrl.startsWith(FILE)) {
+            URL url = new URL(destPkgUrl);
+            File file = new File(url.toURI());
+            if (!file.exists()) {
+                throw new IllegalArgumentException(destPkgUrl + " does not 
exists locally");
+            }
+            try {
+                return NarClassLoader.getFromArchive(file, 
Collections.emptySet());
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException(
+                        "Corrupt User PackageFile " + file + " with error " + 
e.getMessage());
+            }
+        } else if (destPkgUrl.startsWith("http")) {
+            URL website = new URL(destPkgUrl);
+            File tempFile = new File(downloadPkgDir, website.getHost() + 
UUID.randomUUID().toString());
+            if (!tempFile.exists()) {
+                throw new IllegalArgumentException("Could not create local 
file " + tempFile);
+            }
+            tempFile.deleteOnExit();
+            return NarClassLoader.getFromArchive(tempFile, 
Collections.emptySet());
+        } else {
+            throw new IllegalArgumentException("Unsupported url protocol "+ 
destPkgUrl +", supported url protocols: [file/http/https]");
+        }
+    }
     
     public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream 
outputStream) throws IOException {
         URL website = new URL(destPkgUrl);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0d30e37..08cd7de 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -39,10 +39,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Files;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
+import java.nio.file.Path;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -75,6 +73,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
@@ -1007,13 +1006,13 @@ public class FunctionsImpl {
         }
         if (!StringUtils.isEmpty(sourceConfigJson)) {
             SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, 
SourceConfig.class);
-            ClassLoader clsLoader = extractClassLoader(functionPkgUrl, 
uploadedInputStreamAsFile);
+            NarClassLoader clsLoader = 
extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, 
uploadedInputStreamAsFile, true);
             ConfigValidation.validateConfig(sourceConfig, 
FunctionConfig.Runtime.JAVA.name(), clsLoader);
             return SourceConfigUtils.convert(sourceConfig, clsLoader);
         }
         if (!StringUtils.isEmpty(sinkConfigJson)) {
             SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, 
SinkConfig.class);
-            ClassLoader clsLoader = extractClassLoader(functionPkgUrl, 
uploadedInputStreamAsFile);
+            NarClassLoader clsLoader = 
extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, 
uploadedInputStreamAsFile, false);
             ConfigValidation.validateConfig(sinkConfig, 
FunctionConfig.Runtime.JAVA.name(), clsLoader);
             return SinkConfigUtils.convert(sinkConfig, clsLoader);
         }
@@ -1076,6 +1075,55 @@ public class FunctionsImpl {
         }
     }
 
+    private NarClassLoader extractNarClassLoader(String archive, String 
pkgUrl, File uploadedInputStreamFileName,
+                                                 boolean isSource) {
+        if (!StringUtils.isEmpty(archive)) {
+            if (isSource) {
+                Path path;
+                try {
+                    path = 
this.worker().getConnectorsManager().getSourceArchive(archive);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("No 
Source archive %s found", archive));
+                }
+                try {
+                    return NarClassLoader.getFromArchive(path.toFile(),
+                            Collections.emptySet());
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("The 
source %s is corrupted", archive));
+                }
+            } else {
+                Path path;
+                try {
+                    path = 
this.worker().getConnectorsManager().getSinkArchive(archive);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("No Sink 
archive %s found", archive));
+                }
+                try {
+                    return NarClassLoader.getFromArchive(path.toFile(),
+                            Collections.emptySet());
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("The sink 
%s is corrupted", archive));
+                }
+            }
+        }
+        if (!StringUtils.isEmpty(pkgUrl)) {
+            try {
+                return Utils.extractNarClassloader(pkgUrl, 
workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
+            } catch (Exception e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+        }
+        if (uploadedInputStreamFileName != null) {
+            try {
+                return 
NarClassLoader.getFromArchive(uploadedInputStreamFileName,
+                        Collections.emptySet());
+            } catch (IOException e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+        }
+        return null;
+    }
+
     private void validateFunctionClassTypes(ClassLoader classLoader, 
FunctionDetails.Builder functionDetailsBuilder) {
 
         // validate only if classLoader is provided
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 175e2eb..e36b069 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -48,11 +48,11 @@ public class SinkApiV2Resource extends FunctionApiResource {
                                  final @PathParam("sinkName") String sinkName,
                                  final @FormDataParam("data") InputStream 
uploadedInputStream,
                                  final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
-                                 final @FormDataParam("url") String 
sourcePkgUrl,
+                                 final @FormDataParam("url") String 
functionPkgUrl,
                                  final @FormDataParam("sinkConfig") String 
sinkConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sinkName, 
uploadedInputStream, fileDetail,
-                sourcePkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, null, null, sinkConfigJson, 
clientAppId());
 
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 9d25bf9..cfb6baa 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -44,15 +44,15 @@ public class SourceApiV2Resource extends 
FunctionApiResource {
     @Path("/{tenant}/{namespace}/{sourceName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public Response registerSource(final @PathParam("tenant") String tenant,
-                                     final @PathParam("namespace") String 
namespace,
-                                     final @PathParam("sourceName") String 
sourceName,
-                                     final @FormDataParam("data") InputStream 
uploadedInputStream,
-                                     final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
-                                     final @FormDataParam("url") String 
sourcePkgUrl,
-                                     final @FormDataParam("sourceConfig") 
String sourceConfigJson) {
+                                   final @PathParam("namespace") String 
namespace,
+                                   final @PathParam("sourceName") String 
sourceName,
+                                   final @FormDataParam("data") InputStream 
uploadedInputStream,
+                                   final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String 
functionPkgUrl,
+                                   final @FormDataParam("sourceConfig") String 
sourceConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
-                sourcePkgUrl, null, null, sourceConfigJson, null, 
clientAppId());
+                functionPkgUrl, null, null, sourceConfigJson, null, 
clientAppId());
 
     }
 

Reply via email to