This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 4a920f913eb652183021be727203033529570d90
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Thu May 28 14:27:52 2020 +0800

    [FLINK-17875] [core] Move default function config values to HttpFunctionSpec
---
 .../flink/core/httpfn/HttpFunctionSpec.java        | 46 +++++++++++++++++++++-
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 31 ++++++++-------
 2 files changed, 61 insertions(+), 16 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
index 61945f4..0e03591 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionSpec.java
@@ -19,19 +19,24 @@ package org.apache.flink.statefun.flink.core.httpfn;
 
 import java.net.URI;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 
 public final class HttpFunctionSpec implements FunctionSpec {
+
+  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
+  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
+
   private final FunctionType functionType;
   private final URI endpoint;
   private final List<String> states;
   private final Duration maxRequestDuration;
   private final int maxNumBatchRequests;
 
-  public HttpFunctionSpec(
+  private HttpFunctionSpec(
       FunctionType functionType,
       URI endpoint,
       List<String> states,
@@ -44,6 +49,10 @@ public final class HttpFunctionSpec implements FunctionSpec {
     this.maxNumBatchRequests = maxNumBatchRequests;
   }
 
+  public static Builder builder(FunctionType functionType, URI endpoint) {
+    return new Builder(functionType, endpoint);
+  }
+
   @Override
   public FunctionType functionType() {
     return functionType;
@@ -74,4 +83,39 @@ public final class HttpFunctionSpec implements FunctionSpec {
   public int maxNumBatchRequests() {
     return maxNumBatchRequests;
   }
+
+  public static final class Builder {
+
+    private final FunctionType functionType;
+    private final URI endpoint;
+
+    private final List<String> states = new ArrayList<>();
+    private Duration maxRequestDuration = DEFAULT_HTTP_TIMEOUT;
+    private int maxNumBatchRequests = DEFAULT_MAX_NUM_BATCH_REQUESTS;
+
+    private Builder(FunctionType functionType, URI endpoint) {
+      this.functionType = Objects.requireNonNull(functionType);
+      this.endpoint = Objects.requireNonNull(endpoint);
+    }
+
+    public Builder withState(String stateName) {
+      this.states.add(stateName);
+      return this;
+    }
+
+    public Builder withMaxRequestDuration(Duration duration) {
+      this.maxRequestDuration = Objects.requireNonNull(duration);
+      return this;
+    }
+
+    public Builder withMaxNumBatchRequests(int maxNumBatchRequests) {
+      this.maxNumBatchRequests = maxNumBatchRequests;
+      return this;
+    }
+
+    public HttpFunctionSpec build() {
+      return new HttpFunctionSpec(
+          functionType, endpoint, states, maxRequestDuration, 
maxNumBatchRequests);
+    }
+  }
 }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 25f1981..7dc5173 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -29,6 +29,8 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collector;
@@ -48,9 +50,6 @@ import org.apache.flink.util.TimeUtils;
 
 final class FunctionJsonEntity implements JsonEntity {
 
-  private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
-  private static final Integer DEFAULT_MAX_NUM_BATCH_REQUESTS = 1000;
-
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion 
formatVersion) {
     final Iterable<? extends JsonNode> functionSpecNodes = 
functionSpecNodes(moduleSpecRootNode);
@@ -83,12 +82,16 @@ final class FunctionJsonEntity implements JsonEntity {
     FunctionType functionType = functionType(functionNode);
     switch (kind) {
       case HTTP:
-        return new HttpFunctionSpec(
-            functionType,
-            functionUri(functionNode),
-            functionStates(functionNode),
-            maxRequestDuration(functionNode),
-            maxNumBatchRequests(functionNode));
+        final HttpFunctionSpec.Builder specBuilder =
+            HttpFunctionSpec.builder(functionType, functionUri(functionNode));
+
+        for (String state : functionStates(functionNode)) {
+          specBuilder.withState(state);
+        }
+        
optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests);
+        
optionalMaxRequestDuration(functionNode).ifPresent(specBuilder::withMaxRequestDuration);
+
+        return specBuilder.build();
       case GRPC:
         return new GrpcFunctionSpec(functionType, 
functionAddress(functionNode));
       default:
@@ -100,16 +103,14 @@ final class FunctionJsonEntity implements JsonEntity {
     return Selectors.textListAt(functionNode, 
Pointers.Functions.FUNCTION_STATES);
   }
 
-  private static int maxNumBatchRequests(JsonNode functionNode) {
+  private static OptionalInt optionalMaxNumBatchRequests(JsonNode 
functionNode) {
     return Selectors.optionalIntegerAt(
-            functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS)
-        .orElse(DEFAULT_MAX_NUM_BATCH_REQUESTS);
+        functionNode, Pointers.Functions.FUNCTION_MAX_NUM_BATCH_REQUESTS);
   }
 
-  private static Duration maxRequestDuration(JsonNode functionNode) {
+  private static Optional<Duration> optionalMaxRequestDuration(JsonNode 
functionNode) {
     return Selectors.optionalTextAt(functionNode, 
Pointers.Functions.FUNCTION_TIMEOUT)
-        .map(TimeUtils::parseDuration)
-        .orElse(DEFAULT_HTTP_TIMEOUT);
+        .map(TimeUtils::parseDuration);
   }
 
   private static FunctionType functionType(JsonNode functionNode) {

Reply via email to