This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit d8a221078e88fe082a411f19ff7ad3f66e028324 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu May 28 15:20:47 2020 +0800 [FLINK-17875] [core] Update FunctionJsonEntity to recognize V2 state format --- .../flink/core/jsonmodule/FunctionJsonEntity.java | 62 +++++++++++++++++++--- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java index 9b11f00..78bc18d 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java @@ -26,6 +26,7 @@ import static org.apache.flink.statefun.flink.core.jsonmodule.FunctionSpec.Kind; import java.net.InetSocketAddress; import java.net.URI; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -34,6 +35,7 @@ import java.util.OptionalInt; import java.util.Set; import java.util.function.Function; import java.util.stream.Collector; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer; @@ -44,6 +46,7 @@ import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionProvider; import org.apache.flink.statefun.flink.core.grpcfn.GrpcFunctionSpec; import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionProvider; import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionSpec; +import org.apache.flink.statefun.flink.core.httpfn.StateSpec; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.StatefulFunctionProvider; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule.Binder; @@ -68,12 +71,17 @@ final class FunctionJsonEntity implements JsonEntity { JsonPointer.compile("/function/spec/maxNumBatchRequests"); } + private static final class StateSpecPointers { + private static final JsonPointer NAME = JsonPointer.compile("/name"); + private static final JsonPointer EXPIRE_DURATION = JsonPointer.compile("/expireAfter"); + } + @Override public void bind(Binder binder, JsonNode moduleSpecRootNode, FormatVersion formatVersion) { final Iterable<? extends JsonNode> functionSpecNodes = functionSpecNodes(moduleSpecRootNode); for (Map.Entry<Kind, Map<FunctionType, FunctionSpec>> entry : - parse(functionSpecNodes).entrySet()) { + parse(functionSpecNodes, formatVersion).entrySet()) { StatefulFunctionProvider provider = functionProvider(entry.getKey(), entry.getValue()); Set<FunctionType> functionTypes = entry.getValue().keySet(); for (FunctionType type : functionTypes) { @@ -83,9 +91,9 @@ final class FunctionJsonEntity implements JsonEntity { } private Map<Kind, Map<FunctionType, FunctionSpec>> parse( - Iterable<? extends JsonNode> functionSpecNodes) { + Iterable<? extends JsonNode> functionSpecNodes, FormatVersion formatVersion) { return StreamSupport.stream(functionSpecNodes.spliterator(), false) - .map(FunctionJsonEntity::parseFunctionSpec) + .map(functionSpecNode -> parseFunctionSpec(functionSpecNode, formatVersion)) .collect(groupingBy(FunctionSpec::kind, groupByFunctionType())); } @@ -93,7 +101,8 @@ final class FunctionJsonEntity implements JsonEntity { return Selectors.listAt(moduleSpecRootNode, FUNCTION_SPECS_POINTER); } - private static FunctionSpec parseFunctionSpec(JsonNode functionNode) { + private static FunctionSpec parseFunctionSpec( + JsonNode functionNode, FormatVersion formatVersion) { String functionKind = Selectors.textAt(functionNode, MetaPointers.KIND); FunctionSpec.Kind kind = FunctionSpec.Kind.valueOf(functionKind.toUpperCase(Locale.getDefault())); @@ -103,7 +112,9 @@ final class FunctionJsonEntity implements JsonEntity { final HttpFunctionSpec.Builder specBuilder = HttpFunctionSpec.builder(functionType, functionUri(functionNode)); - for (String state : functionStates(functionNode)) { + final Function<JsonNode, List<StateSpec>> stateSpecParser = + functionStateParserOf(formatVersion); + for (StateSpec state : stateSpecParser.apply(functionNode)) { specBuilder.withState(state); } optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests); @@ -117,8 +128,40 @@ final class FunctionJsonEntity implements JsonEntity { } } - private static List<String> functionStates(JsonNode functionNode) { - return Selectors.textListAt(functionNode, SpecPointers.STATES); + private static Function<JsonNode, List<StateSpec>> functionStateParserOf( + FormatVersion formatVersion) { + switch (formatVersion) { + case v1_0: + return FunctionJsonEntity::functionStateSpecParserV1; + case v2_0: + return FunctionJsonEntity::functionStateSpecParserV2; + default: + throw new IllegalStateException("Unrecognized format version: " + formatVersion); + } + } + + private static List<StateSpec> functionStateSpecParserV1(JsonNode functionNode) { + final List<String> stateNames = Selectors.textListAt(functionNode, SpecPointers.STATES); + return stateNames.stream().map(StateSpec::new).collect(Collectors.toList()); + } + + private static List<StateSpec> functionStateSpecParserV2(JsonNode functionNode) { + final Iterable<? extends JsonNode> stateSpecNodes = + Selectors.listAt(functionNode, SpecPointers.STATES); + final List<StateSpec> stateSpecs = new ArrayList<>(); + + stateSpecNodes.forEach( + stateSpecNode -> { + final String name = Selectors.textAt(stateSpecNode, StateSpecPointers.NAME); + final Optional<Duration> optionalStateExpireDuration = + optionalStateExpireDuration(stateSpecNode); + if (optionalStateExpireDuration.isPresent()) { + stateSpecs.add(new StateSpec(name, optionalStateExpireDuration.get())); + } else { + stateSpecs.add(new StateSpec(name)); + } + }); + return stateSpecs; } private static OptionalInt optionalMaxNumBatchRequests(JsonNode functionNode) { @@ -130,6 +173,11 @@ final class FunctionJsonEntity implements JsonEntity { .map(TimeUtils::parseDuration); } + private static Optional<Duration> optionalStateExpireDuration(JsonNode stateSpecNode) { + return Selectors.optionalTextAt(stateSpecNode, StateSpecPointers.EXPIRE_DURATION) + .map(TimeUtils::parseDuration); + } + private static FunctionType functionType(JsonNode functionNode) { String namespaceName = Selectors.textAt(functionNode, MetaPointers.TYPE); NamespaceNamePair nn = NamespaceNamePair.from(namespaceName);
