Hello, I’m trying to send a message from DataStream API to a remote Stateful
Function (Java SDK). The DataStream job:
public static final FunctionType DEVICE = new
FunctionType("com.github.f1xman.era.anomalydetection.device", "DeviceFunction");
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StatefulFunctionsConfig statefunConfig =
StatefulFunctionsConfig.fromEnvironment(env);
statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
DataStreamSource<String> names = env.addSource(new
NamesSourceFunction());
DataStream<RoutableMessage> namesIngress = names.map(name ->
RoutableMessageBuilder.builder()
.withTargetAddress(DEVICE, name)
.withMessageBody(name)
.build());
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(namesIngress)
.withRequestReplyRemoteFunction(
requestReplyFunctionBuilder(DEVICE,
URI.create("http://localhost:8080/statefun"))
)
.withConfiguration(statefunConfig)
.build(env);
env.execute("Flink Streaming Java API Skeleton");
}
When String value passed to .withMessageBody(...) the following exception
occurred:
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:118)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
... 25 more
Though, sending a String value to an embedded function works well. The
workaround I've found is to wrap the value with TypedValue:
.withMessageBody(TypedValue.newBuilder()
.setValue(ByteString.copyFrom(name,
StandardCharsets.UTF_8))
.setHasValue(true)
.setTypename("example/Name")
.build()
)
This approach requires the receiver function to unwrap the TypedValue and
deserialize the ByteString. It looks too low-level for this kind of API. I
believe this is the wrong usage of Stateful Function's SDK for Flink DataStream
Integration. What is the correct way to implement Flink DataStream and remote
Stateful Functions interoperability?
The job is inspired by the official examples
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/flink-datastream/
Best,
Tymur Yarosh