This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 179f123e1a9accb93ac9097081185ae9c0b6dd3a Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Tue Sep 1 13:47:33 2020 +0800 [FLINK-19106] [core] Extend HttpFunctionSpec with more timeout configs This adds new configuration options for connectTimeout / readTimeout / writeTimeout to HttpFunctionSpec. By default, these timeout are all set to 10 seconds to be conherent with with the previous default behaviour applied by OkHttp. --- .../flink/core/httpfn/HttpFunctionSpec.java | 79 +++++++++++++++++++++- 1 file changed, 77 insertions(+), 2 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java index d29084c..3a8f653 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java @@ -31,12 +31,18 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { private static final long serialVersionUID = 1; private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1); + private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10); + private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(10); + private static final Duration DEFAULT_HTTP_WRITE_TIMEOUT = Duration.ofSeconds(10); private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000; private final FunctionType functionType; private final URI endpoint; private final List<StateSpec> states; private final Duration maxRequestDuration; + private final Duration connectTimeout; + private final Duration readTimeout; + private final Duration writeTimeout; private final int maxNumBatchRequests; private HttpFunctionSpec( @@ -44,11 +50,17 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { URI endpoint, List<StateSpec> states, Duration maxRequestDuration, + Duration connectTimeout, + Duration readTimeout, + Duration writeTimeout, int maxNumBatchRequests) { this.functionType = Objects.requireNonNull(functionType); this.endpoint = Objects.requireNonNull(endpoint); this.states = Objects.requireNonNull(states); this.maxRequestDuration = Objects.requireNonNull(maxRequestDuration); + this.connectTimeout = Objects.requireNonNull(connectTimeout); + this.readTimeout = Objects.requireNonNull(readTimeout); + this.writeTimeout = Objects.requireNonNull(writeTimeout); this.maxNumBatchRequests = maxNumBatchRequests; } @@ -83,6 +95,18 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { return maxRequestDuration; } + public Duration connectTimeout() { + return connectTimeout; + } + + public Duration readTimeout() { + return readTimeout; + } + + public Duration writeTimeout() { + return writeTimeout; + } + public int maxNumBatchRequests() { return maxNumBatchRequests; } @@ -94,6 +118,9 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { private final List<StateSpec> states = new ArrayList<>(); private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT; + private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT; + private Duration readTimeout = DEFAULT_HTTP_READ_TIMEOUT; + private Duration writeTimeout = DEFAULT_HTTP_WRITE_TIMEOUT; private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS; private Builder(FunctionType functionType, URI endpoint) { @@ -107,7 +134,22 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { } public Builder withMaxRequestDuration(Duration duration) { - this.maxRequestDuration = Objects.requireNonNull(duration); + this.maxRequestDuration = requireNonZeroDuration(duration); + return this; + } + + public Builder withConnectTimeoutDuration(Duration duration) { + this.connectTimeout = requireNonZeroDuration(duration); + return this; + } + + public Builder withReadTimeoutDuration(Duration duration) { + this.readTimeout = requireNonZeroDuration(duration); + return this; + } + + public Builder withWriteTimeoutDuration(Duration duration) { + this.writeTimeout = requireNonZeroDuration(duration); return this; } @@ -117,8 +159,41 @@ public final class HttpFunctionSpec implements FunctionSpec, Serializable { } public HttpFunctionSpec build() { + validateTimeouts(); + return new HttpFunctionSpec( - functionType, endpoint, states, maxRequestDuration, maxNumBatchRequests); + functionType, + endpoint, + states, + maxRequestDuration, + connectTimeout, + readTimeout, + writeTimeout, + maxNumBatchRequests); + } + + private Duration requireNonZeroDuration(Duration duration) { + Objects.requireNonNull(duration); + if (duration.equals(Duration.ZERO)) { + throw new IllegalArgumentException("Timeout durations must be larger than 0."); + } + + return duration; + } + + private void validateTimeouts() { + if (connectTimeout.compareTo(maxRequestDuration) > 0) { + throw new IllegalArgumentException( + "Connect timeout cannot be larger than request timeout."); + } + + if (readTimeout.compareTo(maxRequestDuration) > 0) { + throw new IllegalArgumentException("Read timeout cannot be larger than request timeout."); + } + + if (writeTimeout.compareTo(maxRequestDuration) > 0) { + throw new IllegalArgumentException("Write timeout cannot be larger than request timeout."); + } } } }