This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3742483 Fix querystate key not found and add tests for function state (#4014) 3742483 is described below commit 374248374e512118c935bfa43c8c58e424c8f241 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed Apr 10 13:48:36 2019 -0700 Fix querystate key not found and add tests for function state (#4014) * fix return code when querying function state for a key that doesn't exist * add tests * fix formatting --- .../functions/worker/PulsarFunctionStateTest.java | 407 +++++++++++++++++++++ .../functions/worker/rest/api/ComponentImpl.java | 2 + .../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 10 +- 3 files changed, 418 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java new file mode 100644 index 0000000..69f19a0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpServer; +import lombok.ToString; +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; +import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; +import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.client.admin.BrokerStats; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.FunctionState; +import org.apache.pulsar.common.functions.Utils; +import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.FunctionStats; +import org.apache.pulsar.common.policies.data.FunctionStatus; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.utils.FunctionCommon; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import javax.ws.rs.core.Response; +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +/** + * Test Pulsar function state + * + */ +public class PulsarFunctionStateTest { + LocalBookkeeperEnsemble bkEnsemble; + + ServiceConfiguration config; + WorkerConfig workerConfig; + URL urlTls; + PulsarService pulsar; + PulsarAdmin admin; + PulsarClient pulsarClient; + BrokerStats brokerStatsClient; + WorkerService functionsWorkerService; + final String tenant = "external-repl-prop"; + String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + String primaryHost; + String workerId; + + private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); + private final List<Integer> bookiePorts = new LinkedList<>(); + private final int brokerWebServicePort = PortManager.nextFreePort(); + private final int brokerWebServiceTlsPort = PortManager.nextFreePort(); + private final int brokerServicePort = PortManager.nextFreePort(); + private final int brokerServiceTlsPort = PortManager.nextFreePort(); + private final int workerServicePort = PortManager.nextFreePort(); + + private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem"; + private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem"; + private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem"; + private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem"; + private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; + + private static final Logger log = LoggerFactory.getLogger(PulsarFunctionStateTest.class); + + @DataProvider(name = "validRoleName") + public Object[][] validRoleName() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + + @BeforeMethod + void setup(Method method) throws Exception { + + // delete all function temp files + File dir = new File(System.getProperty("java.io.tmpdir")); + File[] foundFiles = dir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.startsWith("function"); + } + }); + + for (File file : foundFiles) { + file.delete(); + } + + log.info("--- Setting up method {} ---", method.getName()); + + // Start local bookkeeper ensemble + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> { + int port = PortManager.nextFreePort(); + bookiePorts.add(port); + return port; + }); + bkEnsemble.start(true); + + String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort; + + config = spy(new ServiceConfiguration()); + config.setClusterName("use"); + Set<String> superUsers = Sets.newHashSet("superUser"); + config.setSuperUserRoles(superUsers); + config.setWebServicePort(brokerWebServicePort); + config.setWebServicePortTls(brokerWebServiceTlsPort); + config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); + config.setBrokerServicePort(brokerServicePort); + config.setBrokerServicePortTls(brokerServiceTlsPort); + config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); + config.setTlsAllowInsecureConnection(true); + config.setAdvertisedAddress("localhost"); + + Set<String> providers = new HashSet<>(); + providers.add(AuthenticationProviderTls.class.getName()); + config.setAuthenticationEnabled(true); + config.setAuthenticationProviders(providers); + + config.setAuthorizationEnabled(true); + config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); + + config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); + + config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); + config.setBrokerClientAuthenticationParameters( + "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); + config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); + config.setBrokerClientTlsEnabled(true); + + + + functionsWorkerService = createPulsarFunctionWorker(config); + urlTls = new URL(brokerServiceUrl); + Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService); + pulsar = new PulsarService(config, functionWorkerService); + pulsar.start(); + + Map<String, String> authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + Authentication authTls = new AuthenticationTls(); + authTls.configure(authParams); + + admin = spy( + PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH) + .allowTlsInsecureConnection(true).authentication(authTls).build()); + + brokerStatsClient = admin.brokerStats(); + primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort); + + // update cluster metadata + ClusterData clusterData = new ClusterData(urlTls.toString()); + admin.clusters().updateCluster(config.getClusterName(), clusterData); + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()); + if (isNotBlank(workerConfig.getClientAuthenticationPlugin()) + && isNotBlank(workerConfig.getClientAuthenticationParameters())) { + clientBuilder.enableTls(workerConfig.isUseTls()); + clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection()); + clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(), + workerConfig.getClientAuthenticationParameters()); + } + pulsarClient = clientBuilder.build(); + + TenantInfo propAdmin = new TenantInfo(); + propAdmin.getAdminRoles().add("superUser"); + propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); + admin.tenants().updateTenant(tenant, propAdmin); + + System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, ""); + + } + + @AfterMethod + void shutdown() throws Exception { + log.info("--- Shutting down ---"); + pulsarClient.close(); + admin.close(); + functionsWorkerService.stop(); + pulsar.close(); + bkEnsemble.stop(); + } + + private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) { + + workerConfig = new WorkerConfig(); + workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace); + workerConfig.setSchedulerClassName( + org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); + workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); + // worker talks to local broker + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()); + workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get()); + workerConfig.setFailureCheckFreqMs(100); + workerConfig.setNumFunctionPackageReplicas(1); + workerConfig.setClusterCoordinationTopicName("coordinate"); + workerConfig.setFunctionAssignmentTopicName("assignment"); + workerConfig.setFunctionMetadataTopicName("metadata"); + workerConfig.setInstanceLivenessCheckFreqMs(100); + workerConfig.setWorkerPort(workerServicePort); + workerConfig.setPulsarFunctionsCluster(config.getClusterName()); + String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); + this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort(); + workerConfig.setWorkerHostname(hostname); + workerConfig.setWorkerId(workerId); + + workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName()); + workerConfig.setClientAuthenticationParameters( + String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH)); + workerConfig.setUseTls(true); + workerConfig.setTlsAllowInsecureConnection(true); + workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); + + workerConfig.setAuthenticationEnabled(true); + workerConfig.setAuthorizationEnabled(true); + + workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:4181"); + + return new WorkerService(workerConfig); + } + + protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) { + + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(functionName); + functionConfig.setParallelism(1); + functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE); + functionConfig.setSubName(subscriptionName); + functionConfig.setInputs(Collections.singleton(sourceTopic)); + functionConfig.setAutoAck(true); + functionConfig.setClassName("org.apache.pulsar.functions.api.examples.WordCountFunction"); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setOutput(sinkTopic); + functionConfig.setCleanupSubscription(true); + return functionConfig; + } + + @Test(timeOut = 20000) + public void testPulsarFunctionState() throws Exception { + + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/input"; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String propertyKey = "key"; + final String propertyValue = "value"; + final String functionName = "PulsarFunction-test"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create a producer that creates a topic at broker + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + + FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, + sourceTopic, sinkTopic, subscriptionName); + + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); + admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar sink consumer has started on the topic + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + + int totalMsgs = 5; + for (int i = 0; i < totalMsgs; i++) { + String data = "foo"; + producer.newMessage().property(propertyKey, propertyValue).value(data).send(); + } + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStats.unackedMessages == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + retryStrategically((test) -> { + try { + FunctionStats functionStat = admin.functions().getFunctionStats(tenant, namespacePortion, functionName); + return functionStat.getProcessedSuccessfullyTotal() == 5; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + FunctionState state = admin.functions().getFunctionState(tenant, namespacePortion, functionName, "foo"); + Assert.assertEquals(state.getNumberValue().intValue(), 5); + + try { + admin.functions().getFunctionState(tenant, namespacePortion, functionName, "bar"); + Assert.fail("Should have failed since key shouldn't exist"); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode()); + } + + // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages + // due to publish failure + assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, + totalMsgs); + + // delete functions + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + // make sure subscriptions are cleanup + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0); + + // make sure all temp files are deleted + File dir = new File(System.getProperty("java.io.tmpdir")); + File[] foundFiles = dir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.startsWith("function"); + } + }); + + Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles)); + } +} \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 0695bb4..b6652cf 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1435,6 +1435,8 @@ public abstract class ComponentImpl { } } } + } catch (RestException e) { + throw e; } catch (Exception e) { log.error("Error while getFunctionState request @ /{}/{}/{}/{}", tenant, namespace, functionName, key, e); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index be39319..373f8bc 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -349,7 +349,7 @@ public class LocalBookkeeperEnsemble { } } - public void start() throws Exception { + public void start(boolean enableStreamStorage) throws Exception { LOG.debug("Local ZK/BK starting ..."); ServerConfiguration conf = new ServerConfiguration(); // Use minimal configuration requiring less memory for unit tests @@ -373,6 +373,14 @@ public class LocalBookkeeperEnsemble { runZookeeper(1000); initializeZookeper(); runBookies(conf); + + if (enableStreamStorage) { + runStreamStorage(new CompositeConfiguration()); + } + } + + public void start() throws Exception { + start(false); } public void startStandalone() throws Exception {