This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f3095d8 Classloader choice for validating Source/Sink (#3865) f3095d8 is described below commit f3095d8697ccbe62e4676f94e671047a82bebe40 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Mar 28 21:22:37 2019 -0700 Classloader choice for validating Source/Sink (#3865) * Try both regular classloader as well as nar class loader for validating source/sinks * Fixed test * Fix unittest * Added more comments to the code * rename variables * Wait for the create to succeed before updating. Otherwise there might be some reamnant producers --- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 8 +++ .../pulsar/functions/utils/SinkConfigUtils.java | 34 +++++++++-- .../pulsar/functions/utils/SourceConfigUtils.java | 32 ++++++++-- .../functions/worker/rest/api/ComponentImpl.java | 3 + .../worker/rest/api/v3/SinkApiV3ResourceTest.java | 2 +- .../rest/api/v3/SourceApiV3ResourceTest.java | 2 +- .../integration/functions/PulsarFunctionsTest.java | 68 ++++++++++++++++++++++ 7 files changed, 137 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 422e9ed..042a952 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -599,6 +599,14 @@ public class PulsarFunctionE2ETest { SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic); admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl); + retryStrategically((test) -> { + try { + return (admin.topics().getStats(sinkTopic).publishers.size() == 1); + } catch (PulsarAdminException e) { + return false; + } + }, 10, 150); + admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl); retryStrategically((test) -> { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index 42538a2..81554c4 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -24,6 +24,7 @@ import com.google.gson.reflect.TypeToken; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; @@ -46,6 +47,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.Utils.getSinkType; +@Slf4j public class SinkConfigUtils { @Getter @@ -296,14 +298,37 @@ public class SinkConfigUtils { } String sinkClassName; - ClassLoader classLoader; + final Class<?> typeArg; + final ClassLoader classLoader; if (!isEmpty(sinkConfig.getClassName())) { sinkClassName = sinkConfig.getClassName(); + // We really don't know if we should use nar class loader or regular classloader + ClassLoader jarClassLoader = null; + ClassLoader narClassLoader = null; try { - classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); + jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); } catch (Exception e) { - throw new IllegalArgumentException("Invalid Sink Jar"); } + try { + narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); + } catch (Exception e) { + } + if (jarClassLoader == null && narClassLoader == null) { + throw new IllegalArgumentException("Invalid Sink Package"); + } + // We use typeArg and classLoader as arguments for lambda functions that require them to be final + // Thus we use these tmp vars + Class<?> tmptypeArg; + ClassLoader tmpclassLoader; + try { + tmptypeArg = getSinkType(sinkClassName, narClassLoader); + tmpclassLoader = narClassLoader; + } catch (Exception e) { + tmptypeArg = getSinkType(sinkClassName, jarClassLoader); + tmpclassLoader = jarClassLoader; + } + typeArg = tmptypeArg; + classLoader = tmpclassLoader; } else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { throw new IllegalArgumentException("Class-name must be present for archive with file-url"); } else { @@ -316,10 +341,9 @@ public class SinkConfigUtils { } catch (IOException e1) { throw new IllegalArgumentException("Failed to extract sink class from archive", e1); } + typeArg = getSinkType(sinkClassName, classLoader); } - Class<?> typeArg = getSinkType(sinkClassName, classLoader); - if (sinkConfig.getTopicToSerdeClassName() != null) { sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> { ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index f721112..9a32085 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -209,14 +209,37 @@ public class SourceConfigUtils { } String sourceClassName; - ClassLoader classLoader; + final Class<?> typeArg; + final ClassLoader classLoader; if (!isEmpty(sourceConfig.getClassName())) { sourceClassName = sourceConfig.getClassName(); + // We really don't know if we should use nar class loader or regular classloader + ClassLoader jarClassLoader = null; + ClassLoader narClassLoader = null; try { - classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); + jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); } catch (Exception e) { - throw new IllegalArgumentException("Invalid Source Jar"); } + try { + narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile); + } catch (Exception e) { + } + if (jarClassLoader == null && narClassLoader == null) { + throw new IllegalArgumentException("Invalid Source Package"); + } + // We use typeArg and classLoader as arguments for lambda functions that require them to be final + // Thus we use these tmp vars + Class<?> tmptypeArg; + ClassLoader tmpclassLoader; + try { + tmptypeArg = getSourceType(sourceClassName, narClassLoader); + tmpclassLoader = narClassLoader; + } catch (Exception e) { + tmptypeArg = getSourceType(sourceClassName, jarClassLoader); + tmpclassLoader = jarClassLoader; + } + typeArg = tmptypeArg; + classLoader = tmpclassLoader; } else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) { throw new IllegalArgumentException("Class-name must be present for archive with file-url"); } else { @@ -229,10 +252,9 @@ public class SourceConfigUtils { } catch (IOException e1) { throw new IllegalArgumentException("Failed to extract source class from archive", e1); } + typeArg = getSourceType(sourceClassName, classLoader); } - Class<?> typeArg = getSourceType(sourceClassName, classLoader); - // Only one of serdeClassName or schemaType should be set if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) { throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 0ed9fd1..fd75da3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -588,6 +588,9 @@ public abstract class ComponentImpl { } else if (uploadedInputStream != null) { functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile, fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType); + } else if (existingComponent.getPackageLocation().getPackagePath().startsWith("builtin://")) { + functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, null, + null, functionDetailsJson, mergedComponentConfigJson, componentType); } else { functionDetails = validateUpdateRequestParamsWithExistingMetadata( tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index 192fd1a..b723411 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -361,7 +361,7 @@ public class SinkApiV3ResourceTest { } } - @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Jar") + @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Package") public void testRegisterSinkHttpUrl() { try { testRegisterSinkMissingArguments( diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index 25a0845..adaa606 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -325,7 +325,7 @@ public class SourceApiV3ResourceTest { } } - @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Jar") + @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Package") public void testRegisterSourceHttpUrl() { try { testRegisterSourceMissingArguments( diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index d8e5f50..a60eb47 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -172,6 +172,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // validate the sink result tester.validateSinkResult(kvs); + // update the sink + updateSinkConnector(tester, tenant, namespace, sinkName, inputTopicName); + // delete the sink deleteSink(tenant, namespace, sinkName); @@ -220,6 +223,45 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { result.getStdout()); } + protected void updateSinkConnector(SinkTester tester, + String tenant, + String namespace, + String sinkName, + String inputTopicName) throws Exception { + String[] commands; + if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) { + commands = new String[] { + PulsarCluster.ADMIN_SCRIPT, + "sink", "update", + "--tenant", tenant, + "--namespace", namespace, + "--name", sinkName, + "--sink-type", tester.sinkType().name().toLowerCase(), + "--sinkConfig", new Gson().toJson(tester.sinkConfig()), + "--inputs", inputTopicName, + "--parallelism", "2" + }; + } else { + commands = new String[] { + PulsarCluster.ADMIN_SCRIPT, + "sink", "create", + "--tenant", tenant, + "--namespace", namespace, + "--name", sinkName, + "--archive", tester.getSinkArchive(), + "--classname", tester.getSinkClassName(), + "--sinkConfig", new Gson().toJson(tester.sinkConfig()), + "--inputs", inputTopicName, + "--parallelism", "2" + }; + } + log.info("Run command : {}", StringUtils.join(commands, ' ')); + ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands); + assertTrue( + result.getStdout().contains("\"Updated successfully\""), + result.getStdout()); + } + protected void getSinkInfoSuccess(SinkTester tester, String tenant, String namespace, @@ -422,6 +464,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // validate the source result validateSourceResult(consumer, kvs); + // update the source connector + updateSourceConnector(tester, tenant, namespace, sourceName, outputTopicName); + // delete the source deleteSource(tenant, namespace, sourceName); @@ -455,6 +500,29 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { result.getStdout()); } + protected void updateSourceConnector(SourceTester tester, + String tenant, + String namespace, + String sourceName, + String outputTopicName) throws Exception { + String[] commands = { + PulsarCluster.ADMIN_SCRIPT, + "source", "update", + "--tenant", tenant, + "--namespace", namespace, + "--name", sourceName, + "--source-type", tester.sourceType(), + "--sourceConfig", new Gson().toJson(tester.sourceConfig()), + "--destinationTopicName", outputTopicName, + "--parallelism", "2" + }; + log.info("Run command : {}", StringUtils.join(commands, ' ')); + ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands); + assertTrue( + result.getStdout().contains("\"Updated successfully\""), + result.getStdout()); + } + protected void getSourceInfoSuccess(SourceTester tester, String tenant, String namespace,