This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f718128  All grpc calls should have timeouts (#2747)
f718128 is described below

commit f71812803f961140a8f26630f40a9b6ff9afe0bb
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Mon Oct 8 12:35:06 2018 -0700

    All grpc calls should have timeouts (#2747)
---
 .../apache/pulsar/functions/runtime/KubernetesRuntime.java    |  4 +++-
 .../org/apache/pulsar/functions/runtime/ProcessRuntime.java   | 11 ++++++-----
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 7303d8f..a9e8c75 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -74,6 +75,7 @@ class KubernetesRuntime implements Runtime {
                     "node.alpha.kubernetes.io/unreachable"
             )
     );
+    private static final long GRPC_TIMEOUT_SECS = 5;
 
     // The thread that invokes the function
     @Getter
@@ -190,7 +192,7 @@ class KubernetesRuntime implements Runtime {
             retval.completeExceptionally(new RuntimeException("Not alive"));
             return retval;
         }
-        ListenableFuture<FunctionStatus> response = 
stub[instanceId].getFunctionStatus(Empty.newBuilder().build());
+        ListenableFuture<FunctionStatus> response = 
stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, 
TimeUnit.SECONDS).getFunctionStatus(Empty.newBuilder().build());
         Futures.addCallback(response, new FutureCallback<FunctionStatus>() {
             @Override
             public void onFailure(Throwable throwable) {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 3534915..bb77768 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -60,6 +60,7 @@ class ProcessRuntime implements Runtime {
     private ScheduledExecutorService timer;
     private InstanceConfig instanceConfig;
     private final Long expectedHealthCheckInterval;
+    private static final long GRPC_TIMEOUT_SECS = 5;
 
     ProcessRuntime(InstanceConfig instanceConfig,
                    String instanceFile,
@@ -135,7 +136,7 @@ class ProcessRuntime implements Runtime {
             retval.completeExceptionally(new RuntimeException("Not alive"));
             return retval;
         }
-        ListenableFuture<FunctionStatus> response = 
stub.getFunctionStatus(Empty.newBuilder().build());
+        ListenableFuture<FunctionStatus> response = 
stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, 
TimeUnit.SECONDS).getFunctionStatus(Empty.newBuilder().build());
         Futures.addCallback(response, new FutureCallback<FunctionStatus>() {
             @Override
             public void onFailure(Throwable throwable) {
@@ -164,7 +165,7 @@ class ProcessRuntime implements Runtime {
             retval.completeExceptionally(new RuntimeException("Not alive"));
             return retval;
         }
-        ListenableFuture<InstanceCommunication.MetricsData> response = 
stub.getAndResetMetrics(Empty.newBuilder().build());
+        ListenableFuture<InstanceCommunication.MetricsData> response = 
stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, 
TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build());
         Futures.addCallback(response, new 
FutureCallback<InstanceCommunication.MetricsData>() {
             @Override
             public void onFailure(Throwable throwable) {
@@ -186,7 +187,7 @@ class ProcessRuntime implements Runtime {
             retval.completeExceptionally(new RuntimeException("Not alive"));
             return retval;
         }
-        ListenableFuture<Empty> response = 
stub.resetMetrics(Empty.newBuilder().build());
+        ListenableFuture<Empty> response = 
stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, 
TimeUnit.SECONDS).resetMetrics(Empty.newBuilder().build());
         Futures.addCallback(response, new FutureCallback<Empty>() {
             @Override
             public void onFailure(Throwable throwable) {
@@ -208,7 +209,7 @@ class ProcessRuntime implements Runtime {
             retval.completeExceptionally(new RuntimeException("Not alive"));
             return retval;
         }
-        ListenableFuture<InstanceCommunication.MetricsData> response = 
stub.getMetrics(Empty.newBuilder().build());
+        ListenableFuture<InstanceCommunication.MetricsData> response = 
stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, 
TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build());
         Futures.addCallback(response, new 
FutureCallback<InstanceCommunication.MetricsData>() {
             @Override
             public void onFailure(Throwable throwable) {
@@ -229,7 +230,7 @@ class ProcessRuntime implements Runtime {
             retval.completeExceptionally(new RuntimeException("Not alive"));
             return retval;
         }
-        ListenableFuture<InstanceCommunication.HealthCheckResult> response = 
stub.healthCheck(Empty.newBuilder().build());
+        ListenableFuture<InstanceCommunication.HealthCheckResult> response = 
stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, 
TimeUnit.SECONDS).healthCheck(Empty.newBuilder().build());
         Futures.addCallback(response, new 
FutureCallback<InstanceCommunication.HealthCheckResult>() {
             @Override
             public void onFailure(Throwable throwable) {

Reply via email to