This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new dfc0e0e fix: NPE when stats manager not initialized (#3891) dfc0e0e is described below commit dfc0e0e9c521a212f11cdf8b3384005008c9c605 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sat Mar 23 22:54:39 2019 -0500 fix: NPE when stats manager not initialized (#3891) * fix: NPE when stats manager not initialized * remove unnecessary imports * add test --- .../functions/instance/JavaInstanceRunnable.java | 65 +++--- .../instance/JavaInstanceRunnableTest.java | 18 +- .../worker/rest/api/FunctionsImplTest.java | 230 +++++++++++++++++++++ 3 files changed, 279 insertions(+), 34 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 8076e0f..667e449 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -532,43 +532,46 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Builder createMetricsDataBuilder() { InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); - - bldr.setProcessedSuccessfullyTotal((long) stats.getTotalProcessedSuccessfully()); - bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions()); - bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions()); - bldr.setReceivedTotal((long) stats.getTotalRecordsReceived()); - bldr.setAvgProcessLatency(stats.getAvgProcessLatency()); - bldr.setLastInvocation((long) stats.getLastInvocation()); - - bldr.setProcessedSuccessfullyTotal1Min((long) stats.getTotalProcessedSuccessfully1min()); - bldr.setSystemExceptionsTotal1Min((long) stats.getTotalSysExceptions1min()); - bldr.setUserExceptionsTotal1Min((long) stats.getTotalUserExceptions1min()); - bldr.setReceivedTotal1Min((long) stats.getTotalRecordsReceived1min()); - bldr.setAvgProcessLatency1Min(stats.getAvgProcessLatency1min()); + if (stats != null) { + bldr.setProcessedSuccessfullyTotal((long) stats.getTotalProcessedSuccessfully()); + bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions()); + bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions()); + bldr.setReceivedTotal((long) stats.getTotalRecordsReceived()); + bldr.setAvgProcessLatency(stats.getAvgProcessLatency()); + bldr.setLastInvocation((long) stats.getLastInvocation()); + + bldr.setProcessedSuccessfullyTotal1Min((long) stats.getTotalProcessedSuccessfully1min()); + bldr.setSystemExceptionsTotal1Min((long) stats.getTotalSysExceptions1min()); + bldr.setUserExceptionsTotal1Min((long) stats.getTotalUserExceptions1min()); + bldr.setReceivedTotal1Min((long) stats.getTotalRecordsReceived1min()); + bldr.setAvgProcessLatency1Min(stats.getAvgProcessLatency1min()); + } return bldr; } public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); - functionStatusBuilder.setNumReceived((long)stats.getTotalRecordsReceived()); - functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully()); - functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions()); - stats.getLatestUserExceptions().forEach(ex -> { - functionStatusBuilder.addLatestUserExceptions(ex); - }); - functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions()); - stats.getLatestSystemExceptions().forEach(ex -> { - functionStatusBuilder.addLatestSystemExceptions(ex); - }); - stats.getLatestSourceExceptions().forEach(ex -> { - functionStatusBuilder.addLatestSourceExceptions(ex); - }); - stats.getLatestSinkExceptions().forEach(ex -> { - functionStatusBuilder.addLatestSinkExceptions(ex); - }); - functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency()); - functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation()); + if (stats != null) { + functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived()); + functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully()); + functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions()); + stats.getLatestUserExceptions().forEach(ex -> { + functionStatusBuilder.addLatestUserExceptions(ex); + }); + functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions()); + stats.getLatestSystemExceptions().forEach(ex -> { + functionStatusBuilder.addLatestSystemExceptions(ex); + }); + stats.getLatestSourceExceptions().forEach(ex -> { + functionStatusBuilder.addLatestSourceExceptions(ex); + }); + stats.getLatestSinkExceptions().forEach(ex -> { + functionStatusBuilder.addLatestSinkExceptions(ex); + }); + functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency()); + functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation()); + } return functionStatusBuilder; } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index b927c49..56d80ae 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -25,6 +25,9 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.testng.Assert; +import org.testng.annotations.Test; import java.lang.reflect.Method; @@ -42,7 +45,7 @@ public class JavaInstanceRunnableTest { } } - private static InstanceConfig createInstanceConfig(boolean addCustom, String outputSerde) { + private static InstanceConfig createInstanceConfig(String outputSerde) { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (outputSerde != null) { functionDetailsBuilder.setSink(SinkSpec.newBuilder().setSerDeClassName(outputSerde).build()); @@ -53,8 +56,8 @@ public class JavaInstanceRunnableTest { return instanceConfig; } - private JavaInstanceRunnable createRunnable(boolean addCustom, String outputSerde) throws Exception { - InstanceConfig config = createInstanceConfig(addCustom, outputSerde); + private JavaInstanceRunnable createRunnable(String outputSerde) throws Exception { + InstanceConfig config = createInstanceConfig(outputSerde); JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( config, null, null, null, null, null, null); return javaInstanceRunnable; @@ -105,4 +108,13 @@ public class JavaInstanceRunnableTest { return null; } } + + @Test + public void testStatsManagerNull() throws Exception { + JavaInstanceRunnable javaInstanceRunnable = createRunnable(null); + + Assert.assertEquals(javaInstanceRunnable.getFunctionStatus().build(), InstanceCommunication.FunctionStatus.newBuilder().build()); + + Assert.assertEquals(javaInstanceRunnable.getMetrics(), InstanceCommunication.MetricsData.newBuilder().build()); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java new file mode 100644 index 0000000..660775d --- /dev/null +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker.rest.api; + +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.pulsar.client.admin.Namespaces; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.policies.data.FunctionStats; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.instance.InstanceConfig; +import org.apache.pulsar.functions.instance.JavaInstanceRunnable; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.runtime.Runtime; +import org.apache.pulsar.functions.runtime.RuntimeFactory; +import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.apache.pulsar.functions.source.TopicSchema; +import org.apache.pulsar.functions.utils.FunctionConfigUtils; +import org.apache.pulsar.functions.worker.FunctionMetaDataManager; +import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; +import org.apache.pulsar.functions.worker.FunctionRuntimeManager; +import org.apache.pulsar.functions.worker.Utils; +import org.apache.pulsar.functions.worker.WorkerConfig; +import org.apache.pulsar.functions.worker.WorkerService; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.doReturn; + +public class FunctionsImplTest { + + private static final class TestFunction implements org.apache.pulsar.functions.api.Function<String, String> { + + @Override + public String process(String input, Context context) { + return input; + } + } + + private static final String tenant = "test-tenant"; + private static final String namespace = "test-namespace"; + private static final String function = "test-function"; + private static final String outputTopic = "test-output-topic"; + private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE; + private static final String className = TestFunction.class.getName(); + private Function.SubscriptionType subscriptionType = Function.SubscriptionType.FAILOVER; + private static final Map<String, String> topicsToSerDeClassName = new HashMap<>(); + static { + topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE); + } + private static final int parallelism = 1; + private static final String workerId = "worker-0"; + + private WorkerService mockedWorkerService; + private PulsarAdmin mockedPulsarAdmin; + private Tenants mockedTenants; + private Namespaces mockedNamespaces; + private TenantInfo mockedTenantInfo; + private List<String> namespaceList = new LinkedList<>(); + private FunctionMetaDataManager mockedManager; + private FunctionRuntimeManager mockedFunctionRunTimeManager; + private RuntimeFactory mockedRuntimeFactory; + private Namespace mockedNamespace; + private FunctionsImpl resource; + private InputStream mockedInputStream; + private FormDataContentDisposition mockedFormData; + private Function.FunctionMetaData mockedFunctionMetadata; + + @BeforeMethod + public void setup() throws Exception { + this.mockedManager = mock(FunctionMetaDataManager.class); + this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class); + this.mockedTenantInfo = mock(TenantInfo.class); + this.mockedRuntimeFactory = mock(RuntimeFactory.class); + this.mockedInputStream = mock(InputStream.class); + this.mockedNamespace = mock(Namespace.class); + this.mockedFormData = mock(FormDataContentDisposition.class); + when(mockedFormData.getFileName()).thenReturn("test"); + this.mockedPulsarAdmin = mock(PulsarAdmin.class); + this.mockedTenants = mock(Tenants.class); + this.mockedNamespaces = mock(Namespaces.class); + this.mockedFunctionMetadata = Function.FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build(); + namespaceList.add(tenant + "/" + namespace); + + this.mockedWorkerService = mock(WorkerService.class); + when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager); + when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager); + when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory); + when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); + when(mockedWorkerService.isInitialized()).thenReturn(true); + when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); + when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); + when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); + when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); + when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList); + when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata); + when(mockedManager.containsFunction(tenant, namespace, function)).thenReturn(true); + when(mockedFunctionRunTimeManager.findFunctionAssignment(eq(tenant), eq(namespace), eq(function), anyInt())) + .thenReturn(Function.Assignment.newBuilder() + .setWorkerId(workerId) + .build()); + + Function.FunctionDetails.Builder functionDetailsBuilder = createDefaultFunctionDetails().toBuilder(); + InstanceConfig instanceConfig = new InstanceConfig(); + instanceConfig.setFunctionDetails(functionDetailsBuilder.build()); + instanceConfig.setMaxBufferedTuples(1024); + + JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( + instanceConfig, null, null, null, null, null, null); + CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<InstanceCommunication.MetricsData>(); + metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics()); + Runtime runtime = mock(Runtime.class); + doReturn(metricsDataCompletableFuture).when(runtime).getMetrics(anyInt()); + + CompletableFuture<InstanceCommunication.FunctionStatus> functionStatusCompletableFuture = new CompletableFuture<>(); + functionStatusCompletableFuture.complete(javaInstanceRunnable.getFunctionStatus().build()); + + RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class); + when(runtimeSpawner.getFunctionStatus(anyInt())).thenReturn(functionStatusCompletableFuture); + doReturn(runtime).when(runtimeSpawner).getRuntime(); + + FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); + doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner(); + + when(mockedFunctionRunTimeManager.getFunctionRuntimeInfo(any())).thenReturn(functionRuntimeInfo); + + // worker config + WorkerConfig workerConfig = new WorkerConfig() + .setWorkerId(workerId) + .setWorkerPort(8080) + .setDownloadDirectory("/tmp/pulsar/functions") + .setFunctionMetadataTopicName("pulsar/functions") + .setNumFunctionPackageReplicas(3) + .setPulsarServiceUrl("pulsar://localhost:6650/"); + when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); + + this.resource = spy(new FunctionsImpl(() -> mockedWorkerService)); + doReturn(FUNCTION).when(this.resource).calculateSubjectType(any()); + } + + @Test + public void testStatusEmpty() { + Assert.assertTrue(this.resource.getFunctionInstanceStatus(tenant, namespace, function, "0", null) !=null); + } + + @Test + public void testMetricsEmpty() { + Function.FunctionDetails.Builder functionDetailsBuilder = createDefaultFunctionDetails().toBuilder(); + InstanceConfig instanceConfig = new InstanceConfig(); + instanceConfig.setFunctionDetails(functionDetailsBuilder.build()); + instanceConfig.setMaxBufferedTuples(1024); + + JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable( + instanceConfig, null, null, null, null, null, null); + CompletableFuture<InstanceCommunication.MetricsData> completableFuture = new CompletableFuture<InstanceCommunication.MetricsData>(); + completableFuture.complete(javaInstanceRunnable.getMetrics()); + Runtime runtime = mock(Runtime.class); + doReturn(completableFuture).when(runtime).getMetrics(anyInt()); + RuntimeSpawner runtimeSpawner = mock(RuntimeSpawner.class); + doReturn(runtime).when(runtimeSpawner).getRuntime(); + + FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); + doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner(); + + FunctionStats.FunctionInstanceStats instanceStats1 = Utils + .getFunctionInstanceStats("public/default/test", functionRuntimeInfo, 0); + FunctionStats.FunctionInstanceStats instanceStats2 = Utils + .getFunctionInstanceStats("public/default/test", functionRuntimeInfo, 1); + + FunctionStats functionStats = new FunctionStats(); + functionStats.addInstance(instanceStats1); + functionStats.addInstance(instanceStats2); + + Assert.assertTrue(functionStats.calculateOverall() != null); + } + + public static FunctionConfig createDefaultFunctionConfig() { + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant(tenant); + functionConfig.setNamespace(namespace); + functionConfig.setName(function); + functionConfig.setClassName(className); + functionConfig.setParallelism(parallelism); + functionConfig.setCustomSerdeInputs(topicsToSerDeClassName); + functionConfig.setOutput(outputTopic); + functionConfig.setOutputSerdeClassName(outputSerdeClassName); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + return functionConfig; + } + + public static Function.FunctionDetails createDefaultFunctionDetails() { + FunctionConfig functionConfig = createDefaultFunctionConfig(); + return FunctionConfigUtils.convert(functionConfig, null); + } +}