This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dfc0e0e  fix: NPE when stats manager not initialized (#3891)
dfc0e0e is described below

commit dfc0e0e9c521a212f11cdf8b3384005008c9c605
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Sat Mar 23 22:54:39 2019 -0500

    fix: NPE when stats manager not initialized (#3891)
    
    * fix: NPE when stats manager not initialized
    
    * remove unnecessary imports
    
    * add test
---
 .../functions/instance/JavaInstanceRunnable.java   |  65 +++---
 .../instance/JavaInstanceRunnableTest.java         |  18 +-
 .../worker/rest/api/FunctionsImplTest.java         | 230 +++++++++++++++++++++
 3 files changed, 279 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 8076e0f..667e449 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -532,43 +532,46 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
 
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
-
-        bldr.setProcessedSuccessfullyTotal((long) 
stats.getTotalProcessedSuccessfully());
-        bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions());
-        bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions());
-        bldr.setReceivedTotal((long) stats.getTotalRecordsReceived());
-        bldr.setAvgProcessLatency(stats.getAvgProcessLatency());
-        bldr.setLastInvocation((long) stats.getLastInvocation());
-
-        bldr.setProcessedSuccessfullyTotal1Min((long) 
stats.getTotalProcessedSuccessfully1min());
-        bldr.setSystemExceptionsTotal1Min((long) 
stats.getTotalSysExceptions1min());
-        bldr.setUserExceptionsTotal1Min((long) 
stats.getTotalUserExceptions1min());
-        bldr.setReceivedTotal1Min((long) stats.getTotalRecordsReceived1min());
-        bldr.setAvgProcessLatency1Min(stats.getAvgProcessLatency1min());
+        if (stats != null) {
+            bldr.setProcessedSuccessfullyTotal((long) 
stats.getTotalProcessedSuccessfully());
+            bldr.setSystemExceptionsTotal((long) 
stats.getTotalSysExceptions());
+            bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions());
+            bldr.setReceivedTotal((long) stats.getTotalRecordsReceived());
+            bldr.setAvgProcessLatency(stats.getAvgProcessLatency());
+            bldr.setLastInvocation((long) stats.getLastInvocation());
+
+            bldr.setProcessedSuccessfullyTotal1Min((long) 
stats.getTotalProcessedSuccessfully1min());
+            bldr.setSystemExceptionsTotal1Min((long) 
stats.getTotalSysExceptions1min());
+            bldr.setUserExceptionsTotal1Min((long) 
stats.getTotalUserExceptions1min());
+            bldr.setReceivedTotal1Min((long) 
stats.getTotalRecordsReceived1min());
+            bldr.setAvgProcessLatency1Min(stats.getAvgProcessLatency1min());
+        }
 
         return bldr;
     }
 
     public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
         InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = 
InstanceCommunication.FunctionStatus.newBuilder();
-        
functionStatusBuilder.setNumReceived((long)stats.getTotalRecordsReceived());
-        functionStatusBuilder.setNumSuccessfullyProcessed((long) 
stats.getTotalProcessedSuccessfully());
-        functionStatusBuilder.setNumUserExceptions((long) 
stats.getTotalUserExceptions());
-        stats.getLatestUserExceptions().forEach(ex -> {
-            functionStatusBuilder.addLatestUserExceptions(ex);
-        });
-        functionStatusBuilder.setNumSystemExceptions((long) 
stats.getTotalSysExceptions());
-        stats.getLatestSystemExceptions().forEach(ex -> {
-            functionStatusBuilder.addLatestSystemExceptions(ex);
-        });
-        stats.getLatestSourceExceptions().forEach(ex -> {
-            functionStatusBuilder.addLatestSourceExceptions(ex);
-        });
-        stats.getLatestSinkExceptions().forEach(ex -> {
-            functionStatusBuilder.addLatestSinkExceptions(ex);
-        });
-        functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
-        functionStatusBuilder.setLastInvocationTime((long) 
stats.getLastInvocation());
+        if (stats != null) {
+            functionStatusBuilder.setNumReceived((long) 
stats.getTotalRecordsReceived());
+            functionStatusBuilder.setNumSuccessfullyProcessed((long) 
stats.getTotalProcessedSuccessfully());
+            functionStatusBuilder.setNumUserExceptions((long) 
stats.getTotalUserExceptions());
+            stats.getLatestUserExceptions().forEach(ex -> {
+                functionStatusBuilder.addLatestUserExceptions(ex);
+            });
+            functionStatusBuilder.setNumSystemExceptions((long) 
stats.getTotalSysExceptions());
+            stats.getLatestSystemExceptions().forEach(ex -> {
+                functionStatusBuilder.addLatestSystemExceptions(ex);
+            });
+            stats.getLatestSourceExceptions().forEach(ex -> {
+                functionStatusBuilder.addLatestSourceExceptions(ex);
+            });
+            stats.getLatestSinkExceptions().forEach(ex -> {
+                functionStatusBuilder.addLatestSinkExceptions(ex);
+            });
+            
functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
+            functionStatusBuilder.setLastInvocationTime((long) 
stats.getLastInvocation());
+        }
         return functionStatusBuilder;
     }
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index b927c49..56d80ae 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -25,6 +25,9 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 import java.lang.reflect.Method;
 
@@ -42,7 +45,7 @@ public class JavaInstanceRunnableTest {
         }
     }
 
-    private static InstanceConfig createInstanceConfig(boolean addCustom, 
String outputSerde) {
+    private static InstanceConfig createInstanceConfig(String outputSerde) {
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
         if (outputSerde != null) {
             
functionDetailsBuilder.setSink(SinkSpec.newBuilder().setSerDeClassName(outputSerde).build());
@@ -53,8 +56,8 @@ public class JavaInstanceRunnableTest {
         return instanceConfig;
     }
 
-    private JavaInstanceRunnable createRunnable(boolean addCustom, String 
outputSerde) throws Exception {
-        InstanceConfig config = createInstanceConfig(addCustom, outputSerde);
+    private JavaInstanceRunnable createRunnable(String outputSerde) throws 
Exception {
+        InstanceConfig config = createInstanceConfig(outputSerde);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
                 config, null, null, null, null, null, null);
         return javaInstanceRunnable;
@@ -105,4 +108,13 @@ public class JavaInstanceRunnableTest {
             return null;
         }
     }
+
+    @Test
+    public void testStatsManagerNull() throws Exception {
+        JavaInstanceRunnable javaInstanceRunnable = createRunnable(null);
+
+        Assert.assertEquals(javaInstanceRunnable.getFunctionStatus().build(), 
InstanceCommunication.FunctionStatus.newBuilder().build());
+
+        Assert.assertEquals(javaInstanceRunnable.getMetrics(), 
InstanceCommunication.MetricsData.newBuilder().build());
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
new file mode 100644
index 0000000..660775d
--- /dev/null
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+
+public class FunctionsImplTest {
+
+    private static final class TestFunction implements 
org.apache.pulsar.functions.api.Function<String, String> {
+
+        @Override
+        public String process(String input, Context context) {
+            return input;
+        }
+    }
+
+    private static final String tenant = "test-tenant";
+    private static final String namespace = "test-namespace";
+    private static final String function = "test-function";
+    private static final String outputTopic = "test-output-topic";
+    private static final String outputSerdeClassName = 
TopicSchema.DEFAULT_SERDE;
+    private static final String className = TestFunction.class.getName();
+    private Function.SubscriptionType subscriptionType = 
Function.SubscriptionType.FAILOVER;
+    private static final Map<String, String> topicsToSerDeClassName = new 
HashMap<>();
+    static {
+        
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", 
TopicSchema.DEFAULT_SERDE);
+    }
+    private static final int parallelism = 1;
+    private static final String workerId = "worker-0";
+
+    private WorkerService mockedWorkerService;
+    private PulsarAdmin mockedPulsarAdmin;
+    private Tenants mockedTenants;
+    private Namespaces mockedNamespaces;
+    private TenantInfo mockedTenantInfo;
+    private List<String> namespaceList = new LinkedList<>();
+    private FunctionMetaDataManager mockedManager;
+    private FunctionRuntimeManager mockedFunctionRunTimeManager;
+    private RuntimeFactory mockedRuntimeFactory;
+    private Namespace mockedNamespace;
+    private FunctionsImpl resource;
+    private InputStream mockedInputStream;
+    private FormDataContentDisposition mockedFormData;
+    private Function.FunctionMetaData mockedFunctionMetadata;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        this.mockedManager = mock(FunctionMetaDataManager.class);
+        this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
+        this.mockedTenantInfo = mock(TenantInfo.class);
+        this.mockedRuntimeFactory = mock(RuntimeFactory.class);
+        this.mockedInputStream = mock(InputStream.class);
+        this.mockedNamespace = mock(Namespace.class);
+        this.mockedFormData = mock(FormDataContentDisposition.class);
+        when(mockedFormData.getFileName()).thenReturn("test");
+        this.mockedPulsarAdmin = mock(PulsarAdmin.class);
+        this.mockedTenants = mock(Tenants.class);
+        this.mockedNamespaces = mock(Namespaces.class);
+        this.mockedFunctionMetadata = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        namespaceList.add(tenant + "/" + namespace);
+
+        this.mockedWorkerService = mock(WorkerService.class);
+        
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
+        
when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
+        
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
+        
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
+        when(mockedWorkerService.isInitialized()).thenReturn(true);
+        
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
+        when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
+        when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
+        when(mockedManager.getFunctionMetaData(any(), any(), 
any())).thenReturn(mockedFunctionMetadata);
+        when(mockedManager.containsFunction(tenant, namespace, 
function)).thenReturn(true);
+        when(mockedFunctionRunTimeManager.findFunctionAssignment(eq(tenant), 
eq(namespace), eq(function), anyInt()))
+                .thenReturn(Function.Assignment.newBuilder()
+                        .setWorkerId(workerId)
+                        .build());
+
+        Function.FunctionDetails.Builder functionDetailsBuilder =  
createDefaultFunctionDetails().toBuilder();
+        InstanceConfig instanceConfig = new InstanceConfig();
+        instanceConfig.setFunctionDetails(functionDetailsBuilder.build());
+        instanceConfig.setMaxBufferedTuples(1024);
+
+        JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
+                instanceConfig, null, null, null, null, null, null);
+        CompletableFuture<InstanceCommunication.MetricsData> 
metricsDataCompletableFuture = new 
CompletableFuture<InstanceCommunication.MetricsData>();
+        
metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
+        Runtime runtime = mock(Runtime.class);
+        
doReturn(metricsDataCompletableFuture).when(runtime).getMetrics(anyInt());
+
+        CompletableFuture<InstanceCommunication.FunctionStatus> 
functionStatusCompletableFuture = new CompletableFuture<>();
+        
functionStatusCompletableFuture.complete(javaInstanceRunnable.getFunctionStatus().build());
+
+        RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
+        
when(runtimeSpawner.getFunctionStatus(anyInt())).thenReturn(functionStatusCompletableFuture);
+        doReturn(runtime).when(runtimeSpawner).getRuntime();
+
+        FunctionRuntimeInfo functionRuntimeInfo = 
mock(FunctionRuntimeInfo.class);
+        doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
+
+        
when(mockedFunctionRunTimeManager.getFunctionRuntimeInfo(any())).thenReturn(functionRuntimeInfo);
+
+        // worker config
+        WorkerConfig workerConfig = new WorkerConfig()
+                .setWorkerId(workerId)
+                .setWorkerPort(8080)
+                .setDownloadDirectory("/tmp/pulsar/functions")
+                .setFunctionMetadataTopicName("pulsar/functions")
+                .setNumFunctionPackageReplicas(3)
+                .setPulsarServiceUrl("pulsar://localhost:6650/");
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
+
+        this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
+        doReturn(FUNCTION).when(this.resource).calculateSubjectType(any());
+    }
+
+    @Test
+    public void testStatusEmpty() {
+        Assert.assertTrue(this.resource.getFunctionInstanceStatus(tenant, 
namespace, function, "0", null) !=null);
+    }
+
+    @Test
+    public void testMetricsEmpty() {
+        Function.FunctionDetails.Builder functionDetailsBuilder =  
createDefaultFunctionDetails().toBuilder();
+        InstanceConfig instanceConfig = new InstanceConfig();
+        instanceConfig.setFunctionDetails(functionDetailsBuilder.build());
+        instanceConfig.setMaxBufferedTuples(1024);
+
+        JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
+                instanceConfig, null, null, null, null, null, null);
+        CompletableFuture<InstanceCommunication.MetricsData> completableFuture 
= new CompletableFuture<InstanceCommunication.MetricsData>();
+        completableFuture.complete(javaInstanceRunnable.getMetrics());
+        Runtime runtime = mock(Runtime.class);
+        doReturn(completableFuture).when(runtime).getMetrics(anyInt());
+        RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class);
+        doReturn(runtime).when(runtimeSpawner).getRuntime();
+
+        FunctionRuntimeInfo functionRuntimeInfo = 
mock(FunctionRuntimeInfo.class);
+        doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
+
+        FunctionStats.FunctionInstanceStats instanceStats1 = Utils
+                .getFunctionInstanceStats("public/default/test", 
functionRuntimeInfo, 0);
+        FunctionStats.FunctionInstanceStats instanceStats2 = Utils
+                .getFunctionInstanceStats("public/default/test", 
functionRuntimeInfo, 1);
+
+        FunctionStats functionStats = new FunctionStats();
+        functionStats.addInstance(instanceStats1);
+        functionStats.addInstance(instanceStats2);
+
+        Assert.assertTrue(functionStats.calculateOverall() != null);
+    }
+
+    public static FunctionConfig createDefaultFunctionConfig() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(function);
+        functionConfig.setClassName(className);
+        functionConfig.setParallelism(parallelism);
+        functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
+        functionConfig.setOutput(outputTopic);
+        functionConfig.setOutputSerdeClassName(outputSerdeClassName);
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        return functionConfig;
+    }
+
+    public static Function.FunctionDetails createDefaultFunctionDetails() {
+        FunctionConfig functionConfig = createDefaultFunctionConfig();
+        return FunctionConfigUtils.convert(functionConfig, null);
+    }
+}

Reply via email to