This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 63a0491  Enable stats to be recovered by Kubernetes runtime (#3363)
63a0491 is described below

commit 63a0491ec86a4123062eb2835d6f385bf79e2552
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Mon Jan 14 09:55:02 2019 -0800

    Enable stats to be recovered by Kubernetes runtime (#3363)
---
 .../pulsar/functions/runtime/JavaInstanceMain.java |  2 +-
 .../functions/runtime/KubernetesRuntime.java       | 26 +++++++++++++++++--
 .../pulsar/functions/runtime/ProcessRuntime.java   |  2 +-
 .../apache/pulsar/functions/runtime/Runtime.java   |  2 +-
 .../pulsar/functions/runtime/ThreadRuntime.java    |  2 +-
 .../functions/worker/FunctionRuntimeManager.java   |  2 +-
 .../org/apache/pulsar/functions/worker/Utils.java  | 17 +++---------
 .../functions/worker/rest/api/WorkerImpl.java      | 30 ++++++++++++++++------
 8 files changed, 55 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 3a0a404..43467ae 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -317,7 +317,7 @@ public class JavaInstanceMain implements AutoCloseable {
             Runtime runtime = runtimeSpawner.getRuntime();
             if (runtime != null) {
                 try {
-                    InstanceCommunication.MetricsData metrics = 
runtime.getMetrics().get();
+                    InstanceCommunication.MetricsData metrics = 
runtime.getMetrics(instanceId).get();
                     responseObserver.onNext(metrics);
                     responseObserver.onCompleted();
                 } catch (InterruptedException | ExecutionException e) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index a3a006a..aba473b 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -295,9 +295,31 @@ class KubernetesRuntime implements Runtime {
     }
 
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int 
instanceId) {
         CompletableFuture<InstanceCommunication.MetricsData> retval = new 
CompletableFuture<>();
-        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime 
doesnt support getting metrics via rest"));
+        if (instanceId < 0 || instanceId >= stub.length) {
+            if (stub == null) {
+                retval.completeExceptionally(new RuntimeException("Invalid 
InstanceId"));
+                return retval;
+            }
+        }
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<InstanceCommunication.MetricsData> response = 
stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, 
TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build());
+        Futures.addCallback(response, new 
FutureCallback<InstanceCommunication.MetricsData>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                InstanceCommunication.MetricsData.Builder builder = 
InstanceCommunication.MetricsData.newBuilder();
+                retval.complete(builder.build());
+            }
+
+            @Override
+            public void onSuccess(InstanceCommunication.MetricsData t) {
+                retval.complete(t);
+            }
+        });
         return retval;
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 14e68cc..87017a6 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -288,7 +288,7 @@ class ProcessRuntime implements Runtime {
     }
 
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int 
instanceId) {
         CompletableFuture<InstanceCommunication.MetricsData> retval = new 
CompletableFuture<>();
         if (stub == null) {
             retval.completeExceptionally(new RuntimeException("Not alive"));
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index fafdca7..19a5fc9 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -45,7 +45,7 @@ public interface Runtime {
     
     CompletableFuture<Void> resetMetrics();
     
-    CompletableFuture<InstanceCommunication.MetricsData> getMetrics();
+    CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int 
instanceId);
 
     String getPrometheusMetrics() throws IOException;
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 2bd4644..ad1002c 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -155,7 +155,7 @@ class ThreadRuntime implements Runtime {
     
     
     @Override
-    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int 
instanceId) {
         return 
CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index fdd62d7..a5a6aa2 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -487,7 +487,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                     
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
             RuntimeSpawner runtimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
             if (runtimeSpawner != null) {
-                return 
Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
 functionRuntimeInfo).getMetrics();
+                return 
Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
 functionRuntimeInfo, instanceId).getMetrics();
             }
             return new 
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
         } else {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 4240506..3c1fa4c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -185,7 +185,9 @@ public final class Utils {
         }
     }
 
-    public static FunctionStats.FunctionInstanceStats 
getFunctionInstanceStats(String fullyQualifiedInstanceName, FunctionRuntimeInfo 
functionRuntimeInfo) {
+    public static FunctionStats.FunctionInstanceStats 
getFunctionInstanceStats(String fullyQualifiedInstanceName,
+                                                                               
FunctionRuntimeInfo functionRuntimeInfo,
+                                                                               
int instanceId) {
         RuntimeSpawner functionRuntimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
 
         FunctionStats.FunctionInstanceStats functionInstanceStats = new 
FunctionStats.FunctionInstanceStats();
@@ -194,8 +196,7 @@ public final class Utils {
             if (functionRuntime != null) {
                 try {
 
-                    InstanceCommunication.MetricsData metricsData = 
functionRuntime.getMetrics().get();
-                    int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                    InstanceCommunication.MetricsData metricsData = 
functionRuntime.getMetrics(instanceId).get();
                     functionInstanceStats.setInstanceId(instanceId);
 
                     
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData 
functionInstanceStatsData
@@ -229,14 +230,4 @@ public final class Utils {
         }
         return functionInstanceStats;
     }
-
-    public static FunctionStats getFunctionStats(Map<String, 
FunctionRuntimeInfo> functionRuntimes) {
-        FunctionStats functionStats = new FunctionStats();
-        for (Map.Entry<String, FunctionRuntimeInfo> entry : 
functionRuntimes.entrySet()) {
-            String fullyQualifiedInstanceName = entry.getKey();
-            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
-            
functionStats.addInstance(Utils.getFunctionInstanceStats(fullyQualifiedInstanceName,
 functionRuntimeInfo));
-        }
-        return functionStats;
-    }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 6351272..79f4df2 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -158,14 +158,28 @@ public class WorkerImpl {
             String fullyQualifiedInstanceName = entry.getKey();
             FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
 
-            FunctionStats.FunctionInstanceStats functionInstanceStats =
-                    Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, 
functionRuntimeInfo);
-
-            WorkerFunctionInstanceStats workerFunctionInstanceStats = new 
WorkerFunctionInstanceStats();
-            workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
-            
workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
-
-            metricsList.add(workerFunctionInstanceStats);
+            if 
(workerService.getFunctionRuntimeManager().getRuntimeFactory().externallyManaged())
 {
+                Function.FunctionDetails functionDetails = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
+                int parallelism = functionDetails.getParallelism();
+                for (int i = 0; i < parallelism; ++i) {
+                    FunctionStats.FunctionInstanceStats functionInstanceStats =
+                            
Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, 
i);
+                    WorkerFunctionInstanceStats workerFunctionInstanceStats = 
new WorkerFunctionInstanceStats();
+                    
workerFunctionInstanceStats.setName(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(
+                            functionDetails.getTenant(), 
functionDetails.getNamespace(), functionDetails.getName(), i
+                    ));
+                    
workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+                    metricsList.add(workerFunctionInstanceStats);
+                }
+            } else {
+                FunctionStats.FunctionInstanceStats functionInstanceStats =
+                        
Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
+                                
functionRuntimeInfo.getFunctionInstance().getInstanceId());
+                WorkerFunctionInstanceStats workerFunctionInstanceStats = new 
WorkerFunctionInstanceStats();
+                
workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
+                
workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
+                metricsList.add(workerFunctionInstanceStats);
+            }
         }
         return metricsList;
     }

Reply via email to