igalshilman commented on a change in pull request #177:
URL: https://github.com/apache/flink-statefun/pull/177#discussion_r529000191
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
##########
@@ -34,25 +41,96 @@
private final Map<String, PersistedValue<byte[]>> managedStates;
+ /**
+ * @deprecated {@link PersistedRemoteFunctionValues} should no longer be
instantiated with eagerly
+ * declared state specs. State can now be dynamically registered with
{@link
+ * #registerStates(List)}. This constructor will be removed once old
module specification
+ * formats, which supports eager state declarations, are removed.
+ */
+ @Deprecated
public PersistedRemoteFunctionValues(List<StateSpec> stateSpecs) {
Objects.requireNonNull(stateSpecs);
this.managedStates = new HashMap<>(stateSpecs.size());
- stateSpecs.forEach(this::createAndRegisterValueState);
+ stateSpecs.forEach(this::createAndRegisterEagerValueState);
}
- void forEach(BiConsumer<String, byte[]> consumer) {
- managedStates.forEach((stateName, handle) -> consumer.accept(stateName,
handle.get()));
+ void attachStateValues(InvocationBatchRequest.Builder batchBuilder) {
+ managedStates.forEach(
+ (stateName, stateHandle) -> {
+ ToFunction.PersistedValue.Builder valueBuilder =
+ ToFunction.PersistedValue.newBuilder().setStateName(stateName);
+
+ byte[] stateValue = stateHandle.get();
+ if (stateValue != null) {
+ valueBuilder.setStateValue(ByteString.copyFrom(stateValue));
+ }
+ batchBuilder.addState(valueBuilder);
+ });
+ }
+
+ void updateStateValues(List<PersistedValueMutation> valueMutations) {
+ for (PersistedValueMutation mutate : valueMutations) {
+ final String stateName = mutate.getStateName();
+ switch (mutate.getMutationType()) {
+ case DELETE:
Review comment:
nit: can the cases be wrapped with `{` and `}`?
If it was me who did it like that originally than I apologize 🙈
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
##########
@@ -125,8 +126,44 @@ private void onAsyncResult(
sendToFunction(context, batch);
return;
}
- InvocationResponse invocationResult =
unpackInvocationOrThrow(context.self(), asyncResult);
- handleInvocationResponse(context, invocationResult);
+ if (asyncResult.failure()) {
+ throw new IllegalStateException(
+ "Failure forwarding a message to a remote function " +
context.self(),
+ asyncResult.throwable());
+ }
+
+ final Either<InvocationResponse, IncompleteInvocationContext> response =
+ unpackResponse(asyncResult.value());
+ if (response.isRight()) {
+ handleIncompleteInvocationContextResponse(context, response.right(),
asyncResult.metadata());
+ } else {
+ handleInvocationResultResponse(context, response.left());
+ }
+ }
+
+ private static Either<InvocationResponse, IncompleteInvocationContext>
unpackResponse(
Review comment:
I believe that there is a bug here:
If the remote function does not produce any side effects, the resulting
`FromFunction` will be effectively empty. The reason is that Protobuf simply
doesn't set any empty values. (it omits them when serializing to bytes).
In this case unpack response would either throw or incorrectly return an
empty `IncompleteInvocationContext` (but I believe it would throw).
That is the reason that previously `unpackInvocationOrThrow()` would also
return an `InvocationResponse`.
(I should've added a comment in the method 😅)
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
##########
@@ -34,25 +41,96 @@
private final Map<String, PersistedValue<byte[]>> managedStates;
+ /**
+ * @deprecated {@link PersistedRemoteFunctionValues} should no longer be
instantiated with eagerly
+ * declared state specs. State can now be dynamically registered with
{@link
+ * #registerStates(List)}. This constructor will be removed once old
module specification
+ * formats, which supports eager state declarations, are removed.
+ */
+ @Deprecated
Review comment:
👍
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
##########
@@ -34,25 +41,96 @@
private final Map<String, PersistedValue<byte[]>> managedStates;
+ /**
+ * @deprecated {@link PersistedRemoteFunctionValues} should no longer be
instantiated with eagerly
+ * declared state specs. State can now be dynamically registered with
{@link
+ * #registerStates(List)}. This constructor will be removed once old
module specification
+ * formats, which supports eager state declarations, are removed.
+ */
+ @Deprecated
public PersistedRemoteFunctionValues(List<StateSpec> stateSpecs) {
Objects.requireNonNull(stateSpecs);
this.managedStates = new HashMap<>(stateSpecs.size());
- stateSpecs.forEach(this::createAndRegisterValueState);
+ stateSpecs.forEach(this::createAndRegisterEagerValueState);
}
- void forEach(BiConsumer<String, byte[]> consumer) {
- managedStates.forEach((stateName, handle) -> consumer.accept(stateName,
handle.get()));
+ void attachStateValues(InvocationBatchRequest.Builder batchBuilder) {
+ managedStates.forEach(
Review comment:
Since `attachStateValues` is on the hot path, can you please change the
`.forEach()` to a regular iteration?This will avoid a capturing lambda.
##########
File path:
statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.reqreply;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import java.util.Collections;
+import
org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation;
+import
org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
+import
org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest;
+import
org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.PersistedValue;
+import org.junit.Test;
+
+public class PersistedRemoteFunctionValuesTest {
+
+ @Test
+ public void exampleUsage() {
+ final PersistedRemoteFunctionValues values =
+ new PersistedRemoteFunctionValues(Collections.emptyList());
+
+ // --- register persisted states
+ values.registerStates(
+ Arrays.asList(
+ protocolPersistedValueSpec("state-1"),
protocolPersistedValueSpec("state-2")));
+
+ // --- update state values
+ values.updateStateValues(
+ Arrays.asList(
+ protocolPersistedValueModifyMutation("state-1",
ByteString.copyFromUtf8("data-1")),
+ protocolPersistedValueModifyMutation("state-2",
ByteString.copyFromUtf8("data-2"))));
+
+ final InvocationBatchRequest.Builder builder =
InvocationBatchRequest.newBuilder();
+ values.attachStateValues(builder);
+
+ // --- registered state names and their values should be attached
+ assertThat(builder.getStateList().size(), is(2));
+ assertThat(
+ builder.getStateList(),
+ hasItems(
+ protocolPersistedValue("state-1",
ByteString.copyFromUtf8("data-1")),
+ protocolPersistedValue("state-2",
ByteString.copyFromUtf8("data-2"))));
+ }
+
+ @Test
+ public void zeroRegisteredStates() {
+ final PersistedRemoteFunctionValues values =
+ new PersistedRemoteFunctionValues(Collections.emptyList());
+
+ final InvocationBatchRequest.Builder builder =
InvocationBatchRequest.newBuilder();
+ values.attachStateValues(builder);
+
+ assertThat(builder.getStateList().size(), is(0));
+ }
+
+ @Test(expected = IllegalStateException.class)
Review comment:
I'm wondering: This might be actually a common error, especially when
folks are getting started with the SDK.
should we make this exception a bit more specific and as descriptive as
possible?
Something a long the lines: "... this can happen if you forgot to define
your state in the SDK..." ?
##########
File path:
statefun-flink/statefun-flink-core/src/main/protobuf/http-function.proto
##########
@@ -137,8 +137,45 @@ message FromFunction {
repeated EgressMessage outgoing_egresses = 4;
}
+ //
----------------------------------------------------------------------------------------------------------------
Review comment:
nit: can you stick to the same comment style as other top level messages
in this file?
no new line, no `----` line separators.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]