This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f718128 All grpc calls should have timeouts (#2747) f718128 is described below commit f71812803f961140a8f26630f40a9b6ff9afe0bb Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Mon Oct 8 12:35:06 2018 -0700 All grpc calls should have timeouts (#2747) --- .../apache/pulsar/functions/runtime/KubernetesRuntime.java | 4 +++- .../org/apache/pulsar/functions/runtime/ProcessRuntime.java | 11 ++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 7303d8f..a9e8c75 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -41,6 +41,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -74,6 +75,7 @@ class KubernetesRuntime implements Runtime { "node.alpha.kubernetes.io/unreachable" ) ); + private static final long GRPC_TIMEOUT_SECS = 5; // The thread that invokes the function @Getter @@ -190,7 +192,7 @@ class KubernetesRuntime implements Runtime { retval.completeExceptionally(new RuntimeException("Not alive")); return retval; } - ListenableFuture<FunctionStatus> response = stub[instanceId].getFunctionStatus(Empty.newBuilder().build()); + ListenableFuture<FunctionStatus> response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getFunctionStatus(Empty.newBuilder().build()); Futures.addCallback(response, new FutureCallback<FunctionStatus>() { @Override public void onFailure(Throwable throwable) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 3534915..bb77768 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -60,6 +60,7 @@ class ProcessRuntime implements Runtime { private ScheduledExecutorService timer; private InstanceConfig instanceConfig; private final Long expectedHealthCheckInterval; + private static final long GRPC_TIMEOUT_SECS = 5; ProcessRuntime(InstanceConfig instanceConfig, String instanceFile, @@ -135,7 +136,7 @@ class ProcessRuntime implements Runtime { retval.completeExceptionally(new RuntimeException("Not alive")); return retval; } - ListenableFuture<FunctionStatus> response = stub.getFunctionStatus(Empty.newBuilder().build()); + ListenableFuture<FunctionStatus> response = stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getFunctionStatus(Empty.newBuilder().build()); Futures.addCallback(response, new FutureCallback<FunctionStatus>() { @Override public void onFailure(Throwable throwable) { @@ -164,7 +165,7 @@ class ProcessRuntime implements Runtime { retval.completeExceptionally(new RuntimeException("Not alive")); return retval; } - ListenableFuture<InstanceCommunication.MetricsData> response = stub.getAndResetMetrics(Empty.newBuilder().build()); + ListenableFuture<InstanceCommunication.MetricsData> response = stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getAndResetMetrics(Empty.newBuilder().build()); Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() { @Override public void onFailure(Throwable throwable) { @@ -186,7 +187,7 @@ class ProcessRuntime implements Runtime { retval.completeExceptionally(new RuntimeException("Not alive")); return retval; } - ListenableFuture<Empty> response = stub.resetMetrics(Empty.newBuilder().build()); + ListenableFuture<Empty> response = stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).resetMetrics(Empty.newBuilder().build()); Futures.addCallback(response, new FutureCallback<Empty>() { @Override public void onFailure(Throwable throwable) { @@ -208,7 +209,7 @@ class ProcessRuntime implements Runtime { retval.completeExceptionally(new RuntimeException("Not alive")); return retval; } - ListenableFuture<InstanceCommunication.MetricsData> response = stub.getMetrics(Empty.newBuilder().build()); + ListenableFuture<InstanceCommunication.MetricsData> response = stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build()); Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() { @Override public void onFailure(Throwable throwable) { @@ -229,7 +230,7 @@ class ProcessRuntime implements Runtime { retval.completeExceptionally(new RuntimeException("Not alive")); return retval; } - ListenableFuture<InstanceCommunication.HealthCheckResult> response = stub.healthCheck(Empty.newBuilder().build()); + ListenableFuture<InstanceCommunication.HealthCheckResult> response = stub.withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).healthCheck(Empty.newBuilder().build()); Futures.addCallback(response, new FutureCallback<InstanceCommunication.HealthCheckResult>() { @Override public void onFailure(Throwable throwable) {