This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b852dcc1e02844a7fe123a61f84c37bc4619ed89 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Wed May 27 12:18:42 2020 +0800 [FLINK-17875] [core] Wire in StateSpec into RequestReplyFunction --- .../flink/core/reqreply/RequestReplyFunction.java | 39 +++++++++++++++++----- .../core/reqreply/RequestReplyFunctionTest.java | 3 +- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java index 6687ed0..8cde450 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter; +import org.apache.flink.statefun.flink.core.httpfn.StateSpec; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse; @@ -40,6 +41,7 @@ import org.apache.flink.statefun.sdk.Context; import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.annotations.Persisted; import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.state.Expiration; import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer; import org.apache.flink.statefun.sdk.state.PersistedTable; import org.apache.flink.statefun.sdk.state.PersistedValue; @@ -47,7 +49,7 @@ import org.apache.flink.statefun.sdk.state.PersistedValue; public final class RequestReplyFunction implements StatefulFunction { private final RequestReplyClient client; - private final List<String> registeredStateNames; + private final List<StateSpec> registeredStates; private final int maxNumBatchRequests; /** @@ -69,15 +71,17 @@ public final class RequestReplyFunction implements StatefulFunction { private final PersistedAppendingBuffer<ToFunction.Invocation> batch = PersistedAppendingBuffer.of("batch", ToFunction.Invocation.class); - @Persisted - private final PersistedTable<String, byte[]> managedStates = - PersistedTable.of("states", String.class, byte[].class); + @Persisted private final PersistedTable<String, byte[]> managedStates; public RequestReplyFunction( - List<String> registeredStateNames, int maxNumBatchRequests, RequestReplyClient client) { + List<StateSpec> registeredStates, int maxNumBatchRequests, RequestReplyClient client) { this.client = Objects.requireNonNull(client); - this.registeredStateNames = Objects.requireNonNull(registeredStateNames); + this.registeredStates = Objects.requireNonNull(registeredStates); this.maxNumBatchRequests = maxNumBatchRequests; + + this.managedStates = + PersistedTable.of( + "states", String.class, byte[].class, resolveStateTtlExpiration(registeredStates)); } @Override @@ -92,6 +96,23 @@ public final class RequestReplyFunction implements StatefulFunction { onAsyncResult(context, result); } + private static Expiration resolveStateTtlExpiration(List<StateSpec> stateSpecs) { + // TODO applying the below limitations due to state multiplexing (see FLINK-17954) + // TODO 1) use the max TTL duration across all state, 2) only allow AFTER_READ_AND_WRITE + + Duration maxDuration = Duration.ZERO; + for (StateSpec stateSpec : stateSpecs) { + if (stateSpec.ttlDuration().compareTo(maxDuration) > 0) { + maxDuration = stateSpec.ttlDuration(); + } + } + + if (maxDuration.equals(Duration.ZERO)) { + return Expiration.none(); + } + return Expiration.expireAfterReadingOrWriting(maxDuration); + } + private void onRequest(Context context, Any message) { Invocation.Builder invocationBuilder = singeInvocationBuilder(context, message); int inflightOrBatched = requestState.getOrDefault(-1); @@ -207,11 +228,11 @@ public final class RequestReplyFunction implements StatefulFunction { // -------------------------------------------------------------------------------- private void addStates(ToFunction.InvocationBatchRequest.Builder batchBuilder) { - for (String stateName : registeredStateNames) { + for (StateSpec stateSpec : registeredStates) { ToFunction.PersistedValue.Builder valueBuilder = - ToFunction.PersistedValue.newBuilder().setStateName(stateName); + ToFunction.PersistedValue.newBuilder().setStateName(stateSpec.name()); - byte[] stateValue = managedStates.get(stateName); + byte[] stateValue = managedStates.get(stateSpec.name()); if (stateValue != null) { valueBuilder.setStateValue(ByteString.copyFrom(stateValue)); } diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java index 04d389c..e934399 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.flink.statefun.flink.core.TestUtils; import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter; +import org.apache.flink.statefun.flink.core.httpfn.StateSpec; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation; import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage; @@ -58,7 +59,7 @@ public class RequestReplyFunctionTest { private final FakeClient client = new FakeClient(); private final FakeContext context = new FakeContext(); - private final List<String> states = Collections.singletonList("session"); + private final List<StateSpec> states = Collections.singletonList(new StateSpec("session")); private final RequestReplyFunction functionUnderTest = new RequestReplyFunction(states, 10, client);
