This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 179f123e1a9accb93ac9097081185ae9c0b6dd3a
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Tue Sep 1 13:47:33 2020 +0800

    [FLINK-19106] [core] Extend HttpFunctionSpec with more timeout configs
    
    This adds new configuration options for connectTimeout / readTimeout /
    writeTimeout to HttpFunctionSpec. By default, these timeout are all set
    to 10 seconds to be conherent with with the previous default behaviour
    applied by OkHttp.
---
 .../flink/core/httpfn/HttpFunctionSpec.java        | 79 +++++++++++++++++++++-
 1 file changed, 77 insertions(+), 2 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index d29084c..3a8f653 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -31,12 +31,18 @@ public final class HttpFunctionSpec implements 
FunctionSpec, Serializable {
   private static final long serialVersionUID = 1;
 
   private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+  private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = 
Duration.ofSeconds(10);
+  private static final Duration DEFAULT_HTTP_READ_TIMEOUT = 
Duration.ofSeconds(10);
+  private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = 
Duration.ofSeconds(10);
   private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
 
   private final FunctionType functionType;
   private final URI endpoint;
   private final List<StateSpec> states;
   private final Duration maxRequestDuration;
+  private final Duration connectTimeout;
+  private final Duration readTimeout;
+  private final Duration writeTimeout;
   private final int maxNumBatchRequests;
 
   private HttpFunctionSpec(
@@ -44,11 +50,17 @@ public final class HttpFunctionSpec implements 
FunctionSpec, Serializable {
       URI endpoint,
       List<StateSpec> states,
       Duration maxRequestDuration,
+      Duration connectTimeout,
+      Duration readTimeout,
+      Duration writeTimeout,
       int maxNumBatchRequests) {
     this.functionType = Objects.requireNonNull(functionType);
     this.endpoint = Objects.requireNonNull(endpoint);
     this.states = Objects.requireNonNull(states);
     this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration);
+    this.connectTimeout = Objects.requireNonNull(connectTimeout);
+    this.readTimeout = Objects.requireNonNull(readTimeout);
+    this.writeTimeout = Objects.requireNonNull(writeTimeout);
     this.maxNumBatchRequests = maxNumBatchRequests;
   }
 
@@ -83,6 +95,18 @@ public final class HttpFunctionSpec implements FunctionSpec, 
Serializable {
     return maxRequestDuration;
   }
 
+  public Duration connectTimeout() {
+    return connectTimeout;
+  }
+
+  public Duration readTimeout() {
+    return readTimeout;
+  }
+
+  public Duration writeTimeout() {
+    return writeTimeout;
+  }
+
   public int maxNumBatchRequests() {
     return maxNumBatchRequests;
   }
@@ -94,6 +118,9 @@ public final class HttpFunctionSpec implements FunctionSpec, 
Serializable {
 
     private final List<StateSpec> states = new ArrayList<>();
     private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
+    private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
+    private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT;
+    private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT;
     private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
 
     private Builder(FunctionType functionType, URI endpoint) {
@@ -107,7 +134,22 @@ public final class HttpFunctionSpec implements 
FunctionSpec, Serializable {
     }
 
     public Builder withMaxRequestDuration(Duration duration) {
-      this.maxRequestDuration = Objects.requireNonNull(duration);
+      this.maxRequestDuration = requireNonZeroDuration(duration);
+      return this;
+    }
+
+    public Builder withConnectTimeoutDuration(Duration duration) {
+      this.connectTimeout = requireNonZeroDuration(duration);
+      return this;
+    }
+
+    public Builder withReadTimeoutDuration(Duration duration) {
+      this.readTimeout = requireNonZeroDuration(duration);
+      return this;
+    }
+
+    public Builder withWriteTimeoutDuration(Duration duration) {
+      this.writeTimeout = requireNonZeroDuration(duration);
       return this;
     }
 
@@ -117,8 +159,41 @@ public final class HttpFunctionSpec implements 
FunctionSpec, Serializable {
     }
 
     public HttpFunctionSpec build() {
+      validateTimeouts();
+
       return new HttpFunctionSpec(
-          functionType, endpoint, states, maxRequestDuration, 
maxNumBatchRequests);
+          functionType,
+          endpoint,
+          states,
+          maxRequestDuration,
+          connectTimeout,
+          readTimeout,
+          writeTimeout,
+          maxNumBatchRequests);
+    }
+
+    private Duration requireNonZeroDuration(Duration duration) {
+      Objects.requireNonNull(duration);
+      if (duration.equals(Duration.ZERO)) {
+        throw new IllegalArgumentException("Timeout durations must be larger 
than 0.");
+      }
+
+      return duration;
+    }
+
+    private void validateTimeouts() {
+      if (connectTimeout.compareTo(maxRequestDuration) > 0) {
+        throw new IllegalArgumentException(
+            "Connect timeout cannot be larger than request timeout.");
+      }
+
+      if (readTimeout.compareTo(maxRequestDuration) > 0) {
+        throw new IllegalArgumentException("Read timeout cannot be larger than 
request timeout.");
+      }
+
+      if (writeTimeout.compareTo(maxRequestDuration) > 0) {
+        throw new IllegalArgumentException("Write timeout cannot be larger 
than request timeout.");
+      }
     }
   }
 }

Reply via email to