This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 63a0491 Enable stats to be recovered by Kubernetes runtime (#3363) 63a0491 is described below commit 63a0491ec86a4123062eb2835d6f385bf79e2552 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Mon Jan 14 09:55:02 2019 -0800 Enable stats to be recovered by Kubernetes runtime (#3363) --- .../pulsar/functions/runtime/JavaInstanceMain.java | 2 +- .../functions/runtime/KubernetesRuntime.java | 26 +++++++++++++++++-- .../pulsar/functions/runtime/ProcessRuntime.java | 2 +- .../apache/pulsar/functions/runtime/Runtime.java | 2 +- .../pulsar/functions/runtime/ThreadRuntime.java | 2 +- .../functions/worker/FunctionRuntimeManager.java | 2 +- .../org/apache/pulsar/functions/worker/Utils.java | 17 +++--------- .../functions/worker/rest/api/WorkerImpl.java | 30 ++++++++++++++++------ 8 files changed, 55 insertions(+), 28 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index 3a0a404..43467ae 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -317,7 +317,7 @@ public class JavaInstanceMain implements AutoCloseable { Runtime runtime = runtimeSpawner.getRuntime(); if (runtime != null) { try { - InstanceCommunication.MetricsData metrics = runtime.getMetrics().get(); + InstanceCommunication.MetricsData metrics = runtime.getMetrics(instanceId).get(); responseObserver.onNext(metrics); responseObserver.onCompleted(); } catch (InterruptedException | ExecutionException e) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index a3a006a..aba473b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -295,9 +295,31 @@ class KubernetesRuntime implements Runtime { } @Override - public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() { + public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) { CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>(); - retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getting metrics via rest")); + if (instanceId < 0 || instanceId >= stub.length) { + if (stub == null) { + retval.completeExceptionally(new RuntimeException("Invalid InstanceId")); + return retval; + } + } + if (stub == null) { + retval.completeExceptionally(new RuntimeException("Not alive")); + return retval; + } + ListenableFuture<InstanceCommunication.MetricsData> response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build()); + Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() { + @Override + public void onFailure(Throwable throwable) { + InstanceCommunication.MetricsData.Builder builder = InstanceCommunication.MetricsData.newBuilder(); + retval.complete(builder.build()); + } + + @Override + public void onSuccess(InstanceCommunication.MetricsData t) { + retval.complete(t); + } + }); return retval; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 14e68cc..87017a6 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -288,7 +288,7 @@ class ProcessRuntime implements Runtime { } @Override - public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() { + public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) { CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>(); if (stub == null) { retval.completeExceptionally(new RuntimeException("Not alive")); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java index fafdca7..19a5fc9 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java @@ -45,7 +45,7 @@ public interface Runtime { CompletableFuture<Void> resetMetrics(); - CompletableFuture<InstanceCommunication.MetricsData> getMetrics(); + CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId); String getPrometheusMetrics() throws IOException; } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index 2bd4644..ad1002c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -155,7 +155,7 @@ class ThreadRuntime implements Runtime { @Override - public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() { + public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) { return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics()); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index fdd62d7..a5a6aa2 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -487,7 +487,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance())); RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); if (runtimeSpawner != null) { - return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo).getMetrics(); + return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics(); } return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData(); } else { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 4240506..3c1fa4c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -185,7 +185,9 @@ public final class Utils { } } - public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo functionRuntimeInfo) { + public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName, + FunctionRuntimeInfo functionRuntimeInfo, + int instanceId) { RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats(); @@ -194,8 +196,7 @@ public final class Utils { if (functionRuntime != null) { try { - InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics().get(); - int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); + InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics(instanceId).get(); functionInstanceStats.setInstanceId(instanceId); FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData @@ -229,14 +230,4 @@ public final class Utils { } return functionInstanceStats; } - - public static FunctionStats getFunctionStats(Map<String, FunctionRuntimeInfo> functionRuntimes) { - FunctionStats functionStats = new FunctionStats(); - for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimes.entrySet()) { - String fullyQualifiedInstanceName = entry.getKey(); - FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); - functionStats.addInstance(Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo)); - } - return functionStats; - } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index 6351272..79f4df2 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -158,14 +158,28 @@ public class WorkerImpl { String fullyQualifiedInstanceName = entry.getKey(); FunctionRuntimeInfo functionRuntimeInfo = entry.getValue(); - FunctionStats.FunctionInstanceStats functionInstanceStats = - Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo); - - WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats(); - workerFunctionInstanceStats.setName(fullyQualifiedInstanceName); - workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics()); - - metricsList.add(workerFunctionInstanceStats); + if (workerService.getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) { + Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails(); + int parallelism = functionDetails.getParallelism(); + for (int i = 0; i < parallelism; ++i) { + FunctionStats.FunctionInstanceStats functionInstanceStats = + Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, i); + WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats(); + workerFunctionInstanceStats.setName(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId( + functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), i + )); + workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics()); + metricsList.add(workerFunctionInstanceStats); + } + } else { + FunctionStats.FunctionInstanceStats functionInstanceStats = + Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, + functionRuntimeInfo.getFunctionInstance().getInstanceId()); + WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats(); + workerFunctionInstanceStats.setName(fullyQualifiedInstanceName); + workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics()); + metricsList.add(workerFunctionInstanceStats); + } } return metricsList; }