This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit d8a221078e88fe082a411f19ff7ad3f66e028324
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Thu May 28 15:20:47 2020 +0800

    [FLINK-17875] [core] Update FunctionJsonEntity to recognize V2 state format
---
 .../flink/core/jsonmodule/FunctionJsonEntity.java  | 62 +++++++++++++++++++---
 1 file changed, 55 insertions(+), 7 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 9b11f00..78bc18d 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -26,6 +26,7 @@ import static 
org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -34,6 +35,7 @@ import java.util.OptionalInt;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collector;
+import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
@@ -44,6 +46,7 @@ import 
org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider;
 import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec;
+import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder;
@@ -68,12 +71,17 @@ final class FunctionJsonEntity implements JsonEntity {
         JsonPointer.compile("/function/spec/maxNumBatchRequests");
   }
 
+  private static final class StateSpecPointers {
+    private static final JsonPointer NAME = JsonPointer.compile("/name");
+    private static final JsonPointer EXPIRE_DURATION = 
JsonPointer.compile("/expireAfter");
+  }
+
   @Override
   public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion 
formatVersion) {
     final Iterable<? extends JsonNode> functionSpecNodes = 
functionSpecNodes(moduleSpecRootNode);
 
     for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry :
-        parse(functionSpecNodes).entrySet()) {
+        parse(functionSpecNodes, formatVersion).entrySet()) {
       StatefulFunctionProvider provider = functionProvider(entry.getKey(), 
entry.getValue());
       Set<FunctionType> functionTypes = entry.getValue().keySet();
       for (FunctionType type : functionTypes) {
@@ -83,9 +91,9 @@ final class FunctionJsonEntity implements JsonEntity {
   }
 
   private Map<Kind, Map<FunctionType, FunctionSpec>> parse(
-      Iterable<? extends JsonNode> functionSpecNodes) {
+      Iterable<? extends JsonNode> functionSpecNodes, FormatVersion 
formatVersion) {
     return StreamSupport.stream(functionSpecNodes.spliterator(), false)
-        .map(FunctionJsonEntity::parseFunctionSpec)
+        .map(functionSpecNode -> parseFunctionSpec(functionSpecNode, 
formatVersion))
         .collect(groupingBy(FunctionSpec::kind, groupByFunctionType()));
   }
 
@@ -93,7 +101,8 @@ final class FunctionJsonEntity implements JsonEntity {
     return Selectors.listAt(moduleSpecRootNode, FUNCTION_SPECS_POINTER);
   }
 
-  private static FunctionSpec parseFunctionSpec(JsonNode functionNode) {
+  private static FunctionSpec parseFunctionSpec(
+      JsonNode functionNode, FormatVersion formatVersion) {
     String functionKind = Selectors.textAt(functionNode, MetaPointers.KIND);
     FunctionSpec.Kind kind =
         
FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault()));
@@ -103,7 +112,9 @@ final class FunctionJsonEntity implements JsonEntity {
         final HttpFunctionSpec.Builder specBuilder =
             HttpFunctionSpec.builder(functionType, functionUri(functionNode));
 
-        for (String state : functionStates(functionNode)) {
+        final Function<JsonNode, List<StateSpec>> stateSpecParser =
+            functionStateParserOf(formatVersion);
+        for (StateSpec state : stateSpecParser.apply(functionNode)) {
           specBuilder.withState(state);
         }
         
optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests);
@@ -117,8 +128,40 @@ final class FunctionJsonEntity implements JsonEntity {
     }
   }
 
-  private static List<String> functionStates(JsonNode functionNode) {
-    return Selectors.textListAt(functionNode, SpecPointers.STATES);
+  private static Function<JsonNode, List<StateSpec>> functionStateParserOf(
+      FormatVersion formatVersion) {
+    switch (formatVersion) {
+      case v1_0:
+        return FunctionJsonEntity::functionStateSpecParserV1;
+      case v2_0:
+        return FunctionJsonEntity::functionStateSpecParserV2;
+      default:
+        throw new IllegalStateException("Unrecognized format version: " + 
formatVersion);
+    }
+  }
+
+  private static List<StateSpec> functionStateSpecParserV1(JsonNode 
functionNode) {
+    final List<String> stateNames = Selectors.textListAt(functionNode, 
SpecPointers.STATES);
+    return 
stateNames.stream().map(StateSpec::new).collect(Collectors.toList());
+  }
+
+  private static List<StateSpec> functionStateSpecParserV2(JsonNode 
functionNode) {
+    final Iterable<? extends JsonNode> stateSpecNodes =
+        Selectors.listAt(functionNode, SpecPointers.STATES);
+    final List<StateSpec> stateSpecs = new ArrayList<>();
+
+    stateSpecNodes.forEach(
+        stateSpecNode -> {
+          final String name = Selectors.textAt(stateSpecNode, 
StateSpecPointers.NAME);
+          final Optional<Duration> optionalStateExpireDuration =
+              optionalStateExpireDuration(stateSpecNode);
+          if (optionalStateExpireDuration.isPresent()) {
+            stateSpecs.add(new StateSpec(name, 
optionalStateExpireDuration.get()));
+          } else {
+            stateSpecs.add(new StateSpec(name));
+          }
+        });
+    return stateSpecs;
   }
 
   private static OptionalInt optionalMaxNumBatchRequests(JsonNode 
functionNode) {
@@ -130,6 +173,11 @@ final class FunctionJsonEntity implements JsonEntity {
         .map(TimeUtils::parseDuration);
   }
 
+  private static Optional<Duration> optionalStateExpireDuration(JsonNode 
stateSpecNode) {
+    return Selectors.optionalTextAt(stateSpecNode, 
StateSpecPointers.EXPIRE_DURATION)
+        .map(TimeUtils::parseDuration);
+  }
+
   private static FunctionType functionType(JsonNode functionNode) {
     String namespaceName = Selectors.textAt(functionNode, MetaPointers.TYPE);
     NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);

Reply via email to