This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9b74e9d [functions][stats] NPE in FunctionStatsGenerator when worker service is not ready (#2723) 9b74e9d is described below commit 9b74e9dc734f3c24290b2c81d3f492928fec7484 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Thu Oct 4 23:18:58 2018 -0700 [functions][stats] NPE in FunctionStatsGenerator when worker service is not ready (#2723) * [functions][stats] NPE in FunctionStatsGenerator when worker service is not ready *Motivation* NullPointerException was thrown when function worker is running as part of broker and metrics collection kicks in before worker service completes initialization *Changes* Only generate functions when worker service is ready * Fix FunctionSTatsGeneratorTest --- .../pulsar/functions/worker/FunctionsStatsGenerator.java | 3 ++- .../functions/worker/FunctionStatsGeneratorTest.java | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index be7c88b..3e219d6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -38,7 +38,8 @@ public class FunctionsStatsGenerator { private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class); public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) { - if (workerService != null) { + // only when worker service is initialized, we generate the stats. otherwise we will get bunch of NPE. + if (workerService != null && workerService.isInitialized()) { Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java index 9816822..849d05d 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; import lombok.ToString; import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.apache.pulsar.functions.proto.Function; @@ -41,11 +42,24 @@ import java.util.regex.Pattern; import static com.google.common.base.Preconditions.checkArgument; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; public class FunctionStatsGeneratorTest { @Test + public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() { + WorkerService workerService = mock(WorkerService.class); + when(workerService.isInitialized()).thenReturn(false); + FunctionsStatsGenerator.generate( + workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer())); + verify(workerService, times(1)).isInitialized(); + verify(workerService, times(0)).getFunctionRuntimeManager(); + } + + @Test public void testFunctionsStatsGenerate() { FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new HashMap<>(); @@ -53,6 +67,7 @@ public class FunctionStatsGeneratorTest { WorkerService workerService = mock(WorkerService.class); doReturn(functionRuntimeManager).when(workerService).getFunctionRuntimeManager(); doReturn(new WorkerConfig()).when(workerService).getWorkerConfig(); + when(workerService.isInitialized()).thenReturn(true); CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture = new CompletableFuture<>(); InstanceCommunication.MetricsData metricsData = InstanceCommunication.MetricsData.newBuilder()