This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 752f1b4 Derive source/sink arg-class name from function-class for file-url (#2258) 752f1b4 is described below commit 752f1b44359bc83f0f11ddd2f0cb18f4f70b712c Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Jul 31 11:32:49 2018 -0700 Derive source/sink arg-class name from function-class for file-url (#2258) * Derive source/sink arg-class name from functio-class for file-url archive * fix set type-arg if src/sink arg-class is not set * add unit-test --- .../org/apache/pulsar/io/PulsarSinkE2ETest.java | 99 ++++++++++++++++------ .../org/apache/pulsar/functions/utils/Utils.java | 15 ++-- .../functions/worker/rest/api/FunctionsImpl.java | 42 ++++++--- 3 files changed, 113 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 16f1a76..5398bc9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -21,6 +21,10 @@ package org.apache.pulsar.io; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import java.io.File; import java.lang.reflect.Method; @@ -62,7 +66,6 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList; -import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.DataDigest; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.utils.Reflections; @@ -70,11 +73,9 @@ import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; -import org.apache.pulsar.functions.worker.rest.WorkerServer; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -124,7 +125,7 @@ public class PulsarSinkE2ETest { public Object[][] validRoleName() { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } - + @BeforeMethod void setup(Method method) throws Exception { @@ -147,7 +148,6 @@ public class PulsarSinkE2ETest { config.setBrokerServicePortTls(brokerServiceTlsPort); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); - Set<String> providers = new HashSet<>(); providers.add(AuthenticationProviderTls.class.getName()); config.setAuthenticationEnabled(true); @@ -156,7 +156,6 @@ public class PulsarSinkE2ETest { config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsAllowInsecureConnection(true); - functionsWorkerService = createPulsarFunctionWorker(config); urlTls = new URL(brokerServiceUrl); @@ -190,12 +189,12 @@ public class PulsarSinkE2ETest { workerConfig.getClientAuthenticationParameters()); } pulsarClient = clientBuilder.build(); - + TenantInfo propAdmin = new TenantInfo(); propAdmin.getAdminRoles().add("superUser"); propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); admin.tenants().updateTenant(tenant, propAdmin); - + Thread.sleep(100); } @@ -237,7 +236,7 @@ public class PulsarSinkE2ETest { workerConfig.setUseTls(true); workerConfig.setTlsAllowInsecureConnection(true); workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH); - + workerConfig.setAuthenticationEnabled(true); workerConfig.setAuthorizationEnabled(true); @@ -285,7 +284,7 @@ public class PulsarSinkE2ETest { } }, 5, 150); // validate pulsar sink consumer has started on the topic - Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); int totalMsgs = 5; for (int i = 0; i < totalMsgs; i++) { @@ -303,17 +302,15 @@ public class PulsarSinkE2ETest { Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS); String receivedPropertyValue = msg.getProperty(propertyKey); - Assert.assertEquals(propertyValue, receivedPropertyValue); + assertEquals(propertyValue, receivedPropertyValue); // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages // due to publish failure - Assert.assertNotEquals( - admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, + assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, totalMsgs); } - @Test(timeOut = 20000) public void testPulsarSinkStats() throws Exception { @@ -349,7 +346,7 @@ public class PulsarSinkE2ETest { } }, 5, 150); // validate pulsar sink consumer has started on the topic - Assert.assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); int totalMsgs = 10; for (int i = 0; i < totalMsgs; i++) { @@ -371,7 +368,7 @@ public class PulsarSinkE2ETest { functionName); int numInstances = functionStats.getFunctionStatusListCount(); - Assert.assertEquals(numInstances, 1); + assertEquals(numInstances, 1); FunctionStatus stats = functionStats.getFunctionStatusListList().get(0); Map<String, DataDigest> metricsData = stats.getMetrics().getMetricsMap(); @@ -379,12 +376,13 @@ public class PulsarSinkE2ETest { double count = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_PROCESSED).getCount(); double success = metricsData.get(JavaInstanceRunnable.METRICS_TOTAL_SUCCESS).getCount(); String ownerWorkerId = stats.getWorkerId(); - Assert.assertEquals((int) count, totalMsgs); - Assert.assertEquals((int) success, totalMsgs); - Assert.assertEquals(ownerWorkerId, workerId); + assertEquals((int) count, totalMsgs); + assertEquals((int) success, totalMsgs); + assertEquals(ownerWorkerId, workerId); } - protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) { + protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, + String sinkTopic, String subscriptionName) { File file = new File(jarFile); try { @@ -407,7 +405,7 @@ public class PulsarSinkE2ETest { // source spec classname should be empty so that the default pulsar source will be used SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); - sourceSpecBuilder.setTypeClassName(byte[].class.getName()); + sourceSpecBuilder.setTypeClassName(typeArg.getName()); sourceSpecBuilder.setTopicsPattern(sourceTopicPattern); sourceSpecBuilder.setSubscriptionName(subscriptionName); sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, DefaultSerDe.class.getName()); @@ -425,7 +423,7 @@ public class PulsarSinkE2ETest { return functionDetailsBuilder.build(); } - + @Test(dataProvider = "validRoleName") public void testAuthorization(boolean validRoleName) throws Exception { @@ -450,9 +448,62 @@ public class PulsarSinkE2ETest { sinkTopic, subscriptionName); try { admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); - Assert.assertTrue(validRoleName); + assertTrue(validRoleName); } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) { - Assert.assertFalse(validRoleName); + assertFalse(validRoleName); } } + + /** + * Test to verify: function-server loads jar using file-url and derives type-args classes if not provided + * @throws Exception + */ + @Test(timeOut = 20000) + public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception { + + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String functionName = "PulsarSink-test"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + String jarFilePathUrl = Utils.FILE + ":" + + IdentityFunction.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + functionDetailsBuilder.setTenant(tenant); + functionDetailsBuilder.setNamespace(namespacePortion); + functionDetailsBuilder.setName(functionName); + functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); + functionDetailsBuilder.setParallelism(1); + functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); + + Class<?>[] typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes(new IdentityFunction(), false); + + // set source spec + // source spec classname should be empty so that the default pulsar source will be used + SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); + sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); + sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, DefaultSerDe.class.getName()); + functionDetailsBuilder.setAutoAck(true); + functionDetailsBuilder.setSource(sourceSpecBuilder); + + // set up sink spec + SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); + sinkSpecBuilder.setTopic(sinkTopic); + Map<String, Object> sinkConfigMap = Maps.newHashMap(); + sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap)); + functionDetailsBuilder.setSink(sinkSpecBuilder); + + FunctionDetails functionDetails = functionDetailsBuilder.build(); + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + + FunctionDetails functionMetadata = admin.functions().getFunction(tenant, namespacePortion, functionName); + + assertEquals(functionMetadata.getSource().getTypeClassName(), typeArgs[0].getName()); + assertEquals(functionMetadata.getSink().getTypeClassName(), typeArgs[1].getName()); + + } } \ No newline at end of file diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index 0c25be2..94c315d 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -102,16 +102,21 @@ public class Utils { } public static Class<?>[] getFunctionTypes(FunctionConfig functionConfig) { - - Object userClass = createInstance(functionConfig.getClassName(), Thread.currentThread().getContextClassLoader()); + Object userClass = createInstance(functionConfig.getClassName(), + Thread.currentThread().getContextClassLoader()); + boolean isWindowConfigPresent = functionConfig.getWindowConfig() != null; + return getFunctionTypes(userClass, isWindowConfigPresent); + } + + public static Class<?>[] getFunctionTypes(Object userClass, boolean isWindowConfigPresent) { Class<?>[] typeArgs; // if window function - if (functionConfig.getWindowConfig() != null) { + if (isWindowConfigPresent) { java.util.function.Function function = (java.util.function.Function) userClass; if (function == null) { - throw new IllegalArgumentException(String.format("The Java util function class %s could not be instantiated", - functionConfig.getClassName())); + throw new IllegalArgumentException( + String.format("The Java util function class %s could not be instantiated", userClass)); } typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); if (!typeArgs[0].equals(Collection.class)) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index b935bf5..9e3944b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -56,7 +56,10 @@ import javax.ws.rs.core.StreamingOutput; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.commons.lang3.StringUtils.join; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -139,7 +142,7 @@ public class FunctionsImpl { } FunctionDetails functionDetails; - boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl); + boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); // validate parameters try { if (isPkgUrlProvided) { @@ -203,7 +206,7 @@ public class FunctionsImpl { } FunctionDetails functionDetails; - boolean isPkgUrlProvided = StringUtils.isNotBlank(functionPkgUrl); + boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); // validate parameters try { if (isPkgUrlProvided) { @@ -738,14 +741,14 @@ public class FunctionsImpl { private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) { if (functionDetails.hasSource()) { SourceSpec sourceSpec = functionDetails.getSource(); - if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + if (!isEmpty(sourceSpec.getBuiltin())) { return true; } } if (functionDetails.hasSink()) { SinkSpec sinkSpec = functionDetails.getSink(); - if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + if (!isEmpty(sinkSpec.getBuiltin())) { return true; } } @@ -756,14 +759,14 @@ public class FunctionsImpl { private String getFunctionCodeBuiltin(FunctionDetails functionDetails) { if (functionDetails.hasSource()) { SourceSpec sourceSpec = functionDetails.getSource(); - if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) { + if (!isEmpty(sourceSpec.getBuiltin())) { return sourceSpec.getBuiltin(); } } if (functionDetails.hasSink()) { SinkSpec sinkSpec = functionDetails.getSink(); - if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) { + if (!isEmpty(sinkSpec.getBuiltin())) { return sinkSpec.getBuiltin(); } } @@ -815,7 +818,7 @@ public class FunctionsImpl { missingFields.add("Sink"); } if (!missingFields.isEmpty()) { - String errorMessage = StringUtils.join(missingFields, ","); + String errorMessage = join(missingFields, ","); throw new IllegalArgumentException(errorMessage + " is not provided"); } if (functionDetails.getParallelism() <= 0) { @@ -837,7 +840,7 @@ public class FunctionsImpl { return; } - if (StringUtils.isBlank(functionDetailsBuilder.getClassName())) { + if (isBlank(functionDetailsBuilder.getClassName())) { throw new IllegalArgumentException("function class-name can't be empty"); } @@ -847,13 +850,15 @@ public class FunctionsImpl { // validate function class-type Object functionObject = createInstance(functionDetailsBuilder.getClassName(), classLoader); + Class<?>[] typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false); + if (!(functionObject instanceof org.apache.pulsar.functions.api.Function) && !(functionObject instanceof java.util.function.Function)) { throw new RuntimeException("User class must either be Function or java.util.Function"); } - + if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource() != null - && StringUtils.isNotBlank(functionDetailsBuilder.getSource().getClassName())) { + && isNotBlank(functionDetailsBuilder.getSource().getClassName())) { try { String sourceClassName = functionDetailsBuilder.getSource().getClassName(); String argClassName = getTypeArg(sourceClassName, Source.class, classLoader).getName(); @@ -862,7 +867,7 @@ public class FunctionsImpl { // if sink-class not present then set same arg as source if (!functionDetailsBuilder.hasSink() - || StringUtils.isBlank(functionDetailsBuilder.getSink().getClassName())) { + || isBlank(functionDetailsBuilder.getSink().getClassName())) { functionDetailsBuilder .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName)); } @@ -873,10 +878,14 @@ public class FunctionsImpl { log.error("Failed to validate source class", e); throw new IllegalArgumentException("Failed to validate source class-name", e); } + } else if (isBlank(functionDetailsBuilder.getSourceBuilder().getTypeClassName())) { + // if function-src-class is not present then set function-src type-class according to function class + functionDetailsBuilder + .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName())); } if (functionDetailsBuilder.hasSink() && functionDetailsBuilder.getSink() != null - && StringUtils.isNotBlank(functionDetailsBuilder.getSink().getClassName())) { + && isNotBlank(functionDetailsBuilder.getSink().getClassName())) { try { String sinkClassName = functionDetailsBuilder.getSink().getClassName(); String argClassName = getTypeArg(sinkClassName, Sink.class, classLoader).getName(); @@ -884,7 +893,7 @@ public class FunctionsImpl { // if source-class not present then set same arg as sink if (!functionDetailsBuilder.hasSource() - || StringUtils.isBlank(functionDetailsBuilder.getSource().getClassName())) { + || isBlank(functionDetailsBuilder.getSource().getClassName())) { functionDetailsBuilder .setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName)); } @@ -895,7 +904,12 @@ public class FunctionsImpl { log.error("Failed to validate sink class", e); throw new IllegalArgumentException("Failed to validate sink class-name", e); } + } else if(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())){ + // if function-sink-class is not present then set function-sink type-class according to function class + functionDetailsBuilder + .setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName())); } + } private Class<?> getTypeArg(String className, Class<?> funClass, URLClassLoader classLoader)