This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3742483  Fix querystate key not found and add tests for function state 
(#4014)
3742483 is described below

commit 374248374e512118c935bfa43c8c58e424c8f241
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed Apr 10 13:48:36 2019 -0700

    Fix querystate key not found and add tests for function state (#4014)
    
    * fix return code when querying function state for a key that doesn't exist
    
    * add tests
    
    * fix formatting
---
 .../functions/worker/PulsarFunctionStateTest.java  | 407 +++++++++++++++++++++
 .../functions/worker/rest/api/ComponentImpl.java   |   2 +
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |  10 +-
 3 files changed, 418 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
new file mode 100644
index 0000000..69f19a0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpServer;
+import lombok.ToString;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static 
org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Test Pulsar function state
+ *
+ */
+public class PulsarFunctionStateTest {
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    WorkerConfig workerConfig;
+    URL urlTls;
+    PulsarService pulsar;
+    PulsarAdmin admin;
+    PulsarClient pulsarClient;
+    BrokerStats brokerStatsClient;
+    WorkerService functionsWorkerService;
+    final String tenant = "external-repl-prop";
+    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String primaryHost;
+    String workerId;
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final List<Integer> bookiePorts = new LinkedList<>();
+    private final int brokerWebServicePort = PortManager.nextFreePort();
+    private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
+    private final int brokerServicePort = PortManager.nextFreePort();
+    private final int brokerServiceTlsPort = PortManager.nextFreePort();
+    private final int workerServicePort = PortManager.nextFreePort();
+
+    private final String TLS_SERVER_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/broker-key.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
+    private final String TLS_TRUST_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/cacert.pem";
+
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarFunctionStateTest.class);
+
+    @DataProvider(name = "validRoleName")
+    public Object[][] validRoleName() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    @BeforeMethod
+    void setup(Method method) throws Exception {
+
+        // delete all function temp files
+        File dir = new File(System.getProperty("java.io.tmpdir"));
+        File[] foundFiles = dir.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return name.startsWith("function");
+            }
+        });
+
+        for (File file : foundFiles) {
+            file.delete();
+        }
+
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> {
+            int port = PortManager.nextFreePort();
+            bookiePorts.add(port);
+            return port;
+        });
+        bkEnsemble.start(true);
+
+        String brokerServiceUrl = "https://127.0.0.1:"; + 
brokerWebServiceTlsPort;
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName("use");
+        Set<String> superUsers = Sets.newHashSet("superUser");
+        config.setSuperUserRoles(superUsers);
+        config.setWebServicePort(brokerWebServicePort);
+        config.setWebServicePortTls(brokerWebServiceTlsPort);
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(brokerServicePort);
+        config.setBrokerServicePortTls(brokerServiceTlsPort);
+        config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+        config.setTlsAllowInsecureConnection(true);
+        config.setAdvertisedAddress("localhost");
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        config.setAuthenticationEnabled(true);
+        config.setAuthenticationProviders(providers);
+
+        config.setAuthorizationEnabled(true);
+        
config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+
+        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        config.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + 
"tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+        config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        config.setBrokerClientTlsEnabled(true);
+
+
+
+        functionsWorkerService = createPulsarFunctionWorker(config);
+        urlTls = new URL(brokerServiceUrl);
+        Optional<WorkerService> functionWorkerService = 
Optional.of(functionsWorkerService);
+        pulsar = new PulsarService(config, functionWorkerService);
+        pulsar.start();
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        admin = spy(
+                
PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                        
.allowTlsInsecureConnection(true).authentication(authTls).build());
+
+        brokerStatsClient = admin.brokerStats();
+        primaryHost = String.format("http://%s:%d";, "localhost", 
brokerWebServicePort);
+
+        // update cluster metadata
+        ClusterData clusterData = new ClusterData(urlTls.toString());
+        admin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+        ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+        if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+                && 
isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+            clientBuilder.enableTls(workerConfig.isUseTls());
+            
clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
+            
clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+                    workerConfig.getClientAuthenticationParameters());
+        }
+        pulsarClient = clientBuilder.build();
+
+        TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add("superUser");
+        
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+        admin.tenants().updateTenant(tenant, propAdmin);
+
+        System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        pulsarClient.close();
+        admin.close();
+        functionsWorkerService.stop();
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+    private WorkerService createPulsarFunctionWorker(ServiceConfiguration 
config) {
+
+        workerConfig = new WorkerConfig();
+        workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setSchedulerClassName(
+                
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+        workerConfig.setThreadContainerFactory(new 
WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        // worker talks to local broker
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + 
config.getBrokerServicePortTls().get());
+        workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:"; + 
config.getWebServicePortTls().get());
+        workerConfig.setFailureCheckFreqMs(100);
+        workerConfig.setNumFunctionPackageReplicas(1);
+        workerConfig.setClusterCoordinationTopicName("coordinate");
+        workerConfig.setFunctionAssignmentTopicName("assignment");
+        workerConfig.setFunctionMetadataTopicName("metadata");
+        workerConfig.setInstanceLivenessCheckFreqMs(100);
+        workerConfig.setWorkerPort(workerServicePort);
+        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+        String hostname = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + 
"-" + workerConfig.getWorkerPort();
+        workerConfig.setWorkerHostname(hostname);
+        workerConfig.setWorkerId(workerId);
+
+        
workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        workerConfig.setClientAuthenticationParameters(
+                String.format("tlsCertFile:%s,tlsKeyFile:%s", 
TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
+        workerConfig.setUseTls(true);
+        workerConfig.setTlsAllowInsecureConnection(true);
+        workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        workerConfig.setAuthenticationEnabled(true);
+        workerConfig.setAuthorizationEnabled(true);
+
+        workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:4181");
+
+        return new WorkerService(workerConfig);
+    }
+
+    protected static FunctionConfig createFunctionConfig(String tenant, String 
namespace, String functionName, String sourceTopic, String sinkTopic, String 
subscriptionName) {
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        functionConfig.setParallelism(1);
+        
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setInputs(Collections.singleton(sourceTopic));
+        functionConfig.setAutoAck(true);
+        
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.WordCountFunction");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setOutput(sinkTopic);
+        functionConfig.setCleanupSubscription(true);
+        return functionConfig;
+    }
+
+    @Test(timeOut = 20000)
+    public void testPulsarFunctionState() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/input";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarFunction-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, 
clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+        FunctionConfig functionConfig = createFunctionConfig(tenant, 
namespacePortion, functionName,
+                sourceTopic, sinkTopic, subscriptionName);
+
+        String jarFilePathUrl = Utils.FILE + ":" + 
getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        admin.functions().createFunctionWithUrl(functionConfig, 
jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar sink consumer has started on the topic
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+        int totalMsgs = 5;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "foo";
+            producer.newMessage().property(propertyKey, 
propertyValue).value(data).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats.unackedMessages == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        retryStrategically((test) -> {
+            try {
+                FunctionStats functionStat = 
admin.functions().getFunctionStats(tenant, namespacePortion, functionName);
+                return functionStat.getProcessedSuccessfullyTotal() == 5;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        FunctionState state = admin.functions().getFunctionState(tenant, 
namespacePortion, functionName, "foo");
+        Assert.assertEquals(state.getNumberValue().intValue(), 5);
+
+        try {
+            admin.functions().getFunctionState(tenant, namespacePortion, 
functionName, "bar");
+            Assert.fail("Should have failed since key shouldn't exist");
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 
Response.Status.NOT_FOUND.getStatusCode());
+        }
+
+        // validate pulsar-sink consumer has consumed all messages and 
delivered to Pulsar sink but unacked messages
+        // due to publish failure
+        
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+                totalMsgs);
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, 
functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return 
admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
+        // make sure all temp files are deleted
+        File dir = new File(System.getProperty("java.io.tmpdir"));
+        File[] foundFiles = dir.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return name.startsWith("function");
+            }
+        });
+
+        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: 
" + Arrays.asList(foundFiles));
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0695bb4..b6652cf 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1435,6 +1435,8 @@ public abstract class ComponentImpl {
                     }
                 }
             }
+        } catch (RestException e) {
+            throw e;
         } catch (Exception e) {
             log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
                     tenant, namespace, functionName, key, e);
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index be39319..373f8bc 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -349,7 +349,7 @@ public class LocalBookkeeperEnsemble {
         }
     }
 
-    public void start() throws Exception {
+    public void start(boolean enableStreamStorage) throws  Exception {
         LOG.debug("Local ZK/BK starting ...");
         ServerConfiguration conf = new ServerConfiguration();
         // Use minimal configuration requiring less memory for unit tests
@@ -373,6 +373,14 @@ public class LocalBookkeeperEnsemble {
         runZookeeper(1000);
         initializeZookeper();
         runBookies(conf);
+
+        if (enableStreamStorage) {
+            runStreamStorage(new CompositeConfiguration());
+        }
+    }
+
+    public void start() throws Exception {
+        start(false);
     }
 
     public void startStandalone() throws Exception {

Reply via email to