sijie closed pull request #2724: [functions][stats] don't generate function stats at worker service if runtime is k8s URL: https://github.com/apache/pulsar/pull/2724
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java index 3e219d6c40..80c1b775fc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.eclipse.jetty.util.ConcurrentHashSet; @@ -40,6 +41,11 @@ public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) { // only when worker service is initialized, we generate the stats. otherwise we will get bunch of NPE. if (workerService != null && workerService.isInitialized()) { + // kubernetes runtime factory doesn't support stats collection through worker service + if (workerService.getFunctionRuntimeManager().getRuntimeFactory() instanceof KubernetesRuntimeFactory) { + return; + } + Map<String, FunctionRuntimeInfo> functionRuntimes = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java index 849d05d504..68a13b4264 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.common.util.SimpleTextOutputStream; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.testng.Assert; @@ -59,6 +60,20 @@ public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() { verify(workerService, times(0)).getFunctionRuntimeManager(); } + @Test + public void testGenerateFunctionStatsOnK8SRuntimeFactory() { + WorkerService workerService = mock(WorkerService.class); + when(workerService.isInitialized()).thenReturn(true); + FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class); + when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class)); + when(workerService.getFunctionRuntimeManager()).thenReturn(frm); + FunctionsStatsGenerator.generate( + workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer())); + verify(workerService, times(1)).isInitialized(); + verify(workerService, times(1)).getFunctionRuntimeManager(); + verify(frm, times(0)).getFunctionRuntimeInfos(); + } + @Test public void testFunctionsStatsGenerate() { FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services