This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f8577658f940a582ee59d4e2467f1465a2655de1 Author: Sanjeev Kulkarni <sanj...@streaml.io> AuthorDate: Fri Oct 5 16:23:50 2018 -0700 Plugged in source/sink --- .../pulsar/functions/utils/SinkConfigUtils.java | 9 +-- .../pulsar/functions/utils/SourceConfigUtils.java | 12 +-- .../pulsar/functions/utils/io/ConnectorUtils.java | 84 ++++++++++----------- .../functions/utils/validation/ValidatorImpls.java | 85 +++++++++++----------- .../org/apache/pulsar/functions/worker/Utils.java | 28 +++++++ .../functions/worker/rest/api/FunctionsImpl.java | 60 +++++++++++++-- .../worker/rest/api/v2/SinkApiV2Resource.java | 4 +- .../worker/rest/api/v2/SourceApiV2Resource.java | 14 ++-- 8 files changed, 179 insertions(+), 117 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index d44ea30..95803ab 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -37,7 +37,7 @@ import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee public class SinkConfigUtils { - public static FunctionDetails convert(SinkConfig sinkConfig) throws IOException { + public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader classLoader) throws IOException { String sinkClassName = null; String typeArg = null; @@ -53,11 +53,8 @@ public class SinkConfigUtils { } sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class } else { - sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), - Collections.emptySet())) { - typeArg = Utils.getSinkType(sinkClassName, ncl).getName(); - } + sinkClassName = ConnectorUtils.getIOSinkClass(classLoader); + typeArg = Utils.getSinkType(sinkClassName, classLoader).getName(); } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index 73b331a..a132c8a 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -26,16 +26,14 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.utils.io.ConnectorUtils; -import java.io.File; import java.io.IOException; -import java.util.Collections; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.getSourceType; public class SourceConfigUtils { - public static FunctionDetails convert(SourceConfig sourceConfig) + public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader classLoader) throws IllegalArgumentException, IOException { String sourceClassName = null; @@ -52,12 +50,8 @@ public class SourceConfigUtils { } sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class } else { - sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); - - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()), - Collections.emptySet())) { - typeArg = getSourceType(sourceClassName, ncl).getName(); - } + sourceClassName = ConnectorUtils.getIOSourceClass(classLoader); + typeArg = getSourceType(sourceClassName, classLoader).getName(); } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java index 915df05..c6feb5a 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java @@ -46,59 +46,57 @@ public class ConnectorUtils { /** * Extract the Pulsar IO Source class from a connector archive. */ - public static String getIOSourceClass(String narPath) throws IOException { - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) { - String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME); - - ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, - ConnectorDefinition.class); - if (StringUtils.isEmpty(conf.getSourceClass())) { - throw new IOException( - String.format("The '%s' connector does not provide a source implementation", conf.getName())); - } + public static String getIOSourceClass(ClassLoader classLoader) throws IOException { + NarClassLoader ncl = (NarClassLoader) classLoader; + String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME); + + ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, + ConnectorDefinition.class); + if (StringUtils.isEmpty(conf.getSourceClass())) { + throw new IOException( + String.format("The '%s' connector does not provide a source implementation", conf.getName())); + } - try { - // Try to load source class and check it implements Source interface - Object instance = ncl.loadClass(conf.getSourceClass()).newInstance(); - if (!(instance instanceof Source)) { - throw new IOException("Class " + conf.getSourceClass() + " does not implement interface " - + Source.class.getName()); - } - } catch (Throwable t) { - Exceptions.rethrowIOException(t); + try { + // Try to load source class and check it implements Source interface + Object instance = ncl.loadClass(conf.getSourceClass()).newInstance(); + if (!(instance instanceof Source)) { + throw new IOException("Class " + conf.getSourceClass() + " does not implement interface " + + Source.class.getName()); } - - return conf.getSourceClass(); + } catch (Throwable t) { + Exceptions.rethrowIOException(t); } + + return conf.getSourceClass(); } /** * Extract the Pulsar IO Sink class from a connector archive. */ - public static String getIOSinkClass(String narPath) throws IOException { - try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) { - String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME); + public static String getIOSinkClass(ClassLoader classLoader) throws IOException { + NarClassLoader ncl = (NarClassLoader) classLoader; + String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME); + + ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, + ConnectorDefinition.class); + if (StringUtils.isEmpty(conf.getSinkClass())) { + throw new IOException( + String.format("The '%s' connector does not provide a sink implementation", conf.getName())); + } - ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, - ConnectorDefinition.class); - if (StringUtils.isEmpty(conf.getSinkClass())) { + try { + // Try to load source class and check it implements Sink interface + Object instance = ncl.loadClass(conf.getSinkClass()).newInstance(); + if (!(instance instanceof Sink)) { throw new IOException( - String.format("The '%s' connector does not provide a sink implementation", conf.getName())); - } - - try { - // Try to load source class and check it implements Sink interface - Object instance = ncl.loadClass(conf.getSinkClass()).newInstance(); - if (!(instance instanceof Sink)) { - throw new IOException( - "Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName()); - } - } catch (Throwable t) { - Exceptions.rethrowIOException(t); + "Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName()); } - - return conf.getSinkClass(); + } catch (Throwable t) { + Exceptions.rethrowIOException(t); } + + return conf.getSinkClass(); } public static ConnectorDefinition getConnectorDefinition(String narPath) throws IOException { @@ -127,14 +125,10 @@ public class ConnectorUtils { log.info("Found connector {} from {}", cntDef, archive); if (!StringUtils.isEmpty(cntDef.getSourceClass())) { - // Validate source class to be present and of the right type - ConnectorUtils.getIOSourceClass(archive.toString()); connectors.sources.put(cntDef.getName(), archive); } if (!StringUtils.isEmpty(cntDef.getSinkClass())) { - // Validate sinkclass to be present and of the right type - ConnectorUtils.getIOSinkClass(archive.toString()); connectors.sinks.put(cntDef.getName(), archive); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index dba6bd9..e600afe 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -727,14 +727,15 @@ public class ValidatorImpls { @Override public void validateField(String name, Object o, ClassLoader classLoader) { SourceConfig sourceConfig = (SourceConfig) o; - if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) { - // We don't have to check the archive, since it's provided on the worker itself + if (classLoader == null) { + // This happens at the cli for builtin. There is no need to check this since + // the actual check will be done at serverside return; } String sourceClassName; try { - sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive()); + sourceClassName = ConnectorUtils.getIOSourceClass(classLoader); } catch (IOException e1) { throw new IllegalArgumentException("Failed to extract source class from archive", e1); } @@ -743,15 +744,14 @@ public class ValidatorImpls { Class<?> typeArg = getSourceType(sourceClassName, classLoader); // Only one of serdeClassName or schemaType should be set - if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty() - && sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) { + if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) { throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); } - if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) { + if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) { FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, classLoader, false); } - if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) { + if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) { FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, classLoader, false); } } @@ -761,8 +761,9 @@ public class ValidatorImpls { @Override public void validateField(String name, Object o, ClassLoader classLoader) { SinkConfig sinkConfig = (SinkConfig) o; - if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) { - // We don't have to check the archive, since it's provided on the worker itself + if (classLoader == null) { + // This happens at the cli for builtin. There is no need to check this since + // the actual check will be done at serverside return; } @@ -779,42 +780,42 @@ public class ValidatorImpls { } - try (NarClassLoader clsLoader = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()), - Collections.emptySet())) { - String sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive()); - Class<?> typeArg = getSinkType(sinkClassName, clsLoader); + String sinkClassName; + try { + sinkClassName = ConnectorUtils.getIOSinkClass(classLoader); + } catch (IOException e1) { + throw new IllegalArgumentException("Failed to extract sink class from archive", e1); + } + Class<?> typeArg = getSinkType(sinkClassName, classLoader); - if (sinkConfig.getTopicToSerdeClassName() != null) { - sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> { - FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader, true); - }); - } + if (sinkConfig.getTopicToSerdeClassName() != null) { + sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> { + FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, classLoader, true); + }); + } - if (sinkConfig.getTopicToSchemaType() != null) { - sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> { - FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader, true); - }); - } + if (sinkConfig.getTopicToSchemaType() != null) { + sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> { + FunctionConfigValidator.validateSchema(schemaType, typeArg, name, classLoader, true); + }); + } - // topicsPattern does not need checks - - if (sinkConfig.getInputSpecs() != null) { - sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> { - // Only one is set - if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty() - && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) { - throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); - } - if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) { - FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader, true); - } - if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) { - FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader, true); - } - }); - } - } catch (IOException e) { - throw new IllegalArgumentException(e.getMessage()); + // topicsPattern does not need checks + + if (sinkConfig.getInputSpecs() != null) { + sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> { + // Only one is set + if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty() + && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) { + throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); + } + if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) { + FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, classLoader, true); + } + if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) { + FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, classLoader, true); + } + }); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index aa80403..f935991 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -33,6 +33,7 @@ import java.net.URISyntaxException; import java.net.URL; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.util.Collections; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -47,6 +48,7 @@ import org.apache.distributedlog.metadata.DLMetadata; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.worker.dlog.DLInputStream; import org.apache.pulsar.functions.worker.dlog.DLOutputStream; @@ -163,6 +165,32 @@ public final class Utils { throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]"); } } + + public static NarClassLoader extractNarClassloader(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException { + if (destPkgUrl.startsWith(FILE)) { + URL url = new URL(destPkgUrl); + File file = new File(url.toURI()); + if (!file.exists()) { + throw new IllegalArgumentException(destPkgUrl + " does not exists locally"); + } + try { + return NarClassLoader.getFromArchive(file, Collections.emptySet()); + } catch (MalformedURLException e) { + throw new IllegalArgumentException( + "Corrupt User PackageFile " + file + " with error " + e.getMessage()); + } + } else if (destPkgUrl.startsWith("http")) { + URL website = new URL(destPkgUrl); + File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString()); + if (!tempFile.exists()) { + throw new IllegalArgumentException("Could not create local file " + tempFile); + } + tempFile.deleteOnExit(); + return NarClassLoader.getFromArchive(tempFile, Collections.emptySet()); + } else { + throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]"); + } + } public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException { URL website = new URL(destPkgUrl); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 0d30e37..08cd7de 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -39,10 +39,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; -import java.util.Base64; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; +import java.nio.file.Path; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -75,6 +73,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.Codec; @@ -1007,13 +1006,13 @@ public class FunctionsImpl { } if (!StringUtils.isEmpty(sourceConfigJson)) { SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class); - ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile); + NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true); ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader); return SourceConfigUtils.convert(sourceConfig, clsLoader); } if (!StringUtils.isEmpty(sinkConfigJson)) { SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class); - ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile); + NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false); ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader); return SinkConfigUtils.convert(sinkConfig, clsLoader); } @@ -1076,6 +1075,55 @@ public class FunctionsImpl { } } + private NarClassLoader extractNarClassLoader(String archive, String pkgUrl, File uploadedInputStreamFileName, + boolean isSource) { + if (!StringUtils.isEmpty(archive)) { + if (isSource) { + Path path; + try { + path = this.worker().getConnectorsManager().getSourceArchive(archive); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("No Source archive %s found", archive)); + } + try { + return NarClassLoader.getFromArchive(path.toFile(), + Collections.emptySet()); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("The source %s is corrupted", archive)); + } + } else { + Path path; + try { + path = this.worker().getConnectorsManager().getSinkArchive(archive); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("No Sink archive %s found", archive)); + } + try { + return NarClassLoader.getFromArchive(path.toFile(), + Collections.emptySet()); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("The sink %s is corrupted", archive)); + } + } + } + if (!StringUtils.isEmpty(pkgUrl)) { + try { + return Utils.extractNarClassloader(pkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory()); + } catch (Exception e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + if (uploadedInputStreamFileName != null) { + try { + return NarClassLoader.getFromArchive(uploadedInputStreamFileName, + Collections.emptySet()); + } catch (IOException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + return null; + } + private void validateFunctionClassTypes(ClassLoader classLoader, FunctionDetails.Builder functionDetailsBuilder) { // validate only if classLoader is provided diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java index 175e2eb..e36b069 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java @@ -48,11 +48,11 @@ public class SinkApiV2Resource extends FunctionApiResource { final @PathParam("sinkName") String sinkName, final @FormDataParam("data") InputStream uploadedInputStream, final @FormDataParam("data") FormDataContentDisposition fileDetail, - final @FormDataParam("url") String sourcePkgUrl, + final @FormDataParam("url") String functionPkgUrl, final @FormDataParam("sinkConfig") String sinkConfigJson) { return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail, - sourcePkgUrl, null, null, null, sinkConfigJson, clientAppId()); + functionPkgUrl, null, null, null, sinkConfigJson, clientAppId()); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java index 9d25bf9..cfb6baa 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java @@ -44,15 +44,15 @@ public class SourceApiV2Resource extends FunctionApiResource { @Path("/{tenant}/{namespace}/{sourceName}") @Consumes(MediaType.MULTIPART_FORM_DATA) public Response registerSource(final @PathParam("tenant") String tenant, - final @PathParam("namespace") String namespace, - final @PathParam("sourceName") String sourceName, - final @FormDataParam("data") InputStream uploadedInputStream, - final @FormDataParam("data") FormDataContentDisposition fileDetail, - final @FormDataParam("url") String sourcePkgUrl, - final @FormDataParam("sourceConfig") String sourceConfigJson) { + final @PathParam("namespace") String namespace, + final @PathParam("sourceName") String sourceName, + final @FormDataParam("data") InputStream uploadedInputStream, + final @FormDataParam("data") FormDataContentDisposition fileDetail, + final @FormDataParam("url") String functionPkgUrl, + final @FormDataParam("sourceConfig") String sourceConfigJson) { return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail, - sourcePkgUrl, null, null, sourceConfigJson, null, clientAppId()); + functionPkgUrl, null, null, sourceConfigJson, null, clientAppId()); }