Dear List,
I am trying to call a sample stateful function defined in Python, using
the Stateful Function Python SDK, from a Flink pipeline. I am building
upon the examples provided for the SDK for Flink DataStream Integration
but I am currently stuck on a type cast issue that I am not able to
overcome, even by looking at the flink-statefun sources. I am sure that
I am probably doing something wrong.
In the flink pipeline (of which an excerpt is reported below), I load a
set of users from a CSV file and create a Datastream<User> where User is
a protobuf v3 generated class. Given this stream, the base idea is to
forward the stream to a remote function (written in python using the
sdk) that basically unpacks the user object, extracts the user id and
provides it back as a String.
val REMOTE_GREET = FunctionType("com.me.try", "echo_user_id")
val GREETINGS = EgressIdentifier<String>("com.me.try", "out",
String::class.java)
@JvmStatic
fun main(args: Array<String>) {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val usersCsv = env.readTextFile("input/users.csv")
val users = createUsersStream(usersCsv).shuffle()
val statefunConfig = StatefulFunctionsConfig.fromEnvironment(env)
statefunConfig.factoryType = MessageFactoryType.WITH_PROTOBUF_PAYLOADS
val usersIngress: DataStream<RoutableMessage> = users.map { user ->
RoutableMessageBuilder.builder()
.withTargetAddress(REMOTE_GREET, user.userId.toString())
.withMessageBody(user)
.build()
}
val predictEgress = StatefulFunctionDataStreamBuilder.builder("test")
.withDataStreamAsIngress(usersIngress)
.withRequestReplyRemoteFunction(
RequestReplyFunctionBuilder
.requestReplyFunctionBuilder(REMOTE_GREET,
URI.create("http://127.0.0.1:8000/statefun"))
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxNumBatchRequests(500)
)
.withEgressId(GREETINGS)
.withConfiguration(statefunConfig)
.build(env)
val output = predictEgress.getDataStreamForEgressId(GREETINGS)
output.print()
env.execute("Hello stateful!!")
}
Unfortunately, while the Python function seems to be working (tests
build by following the Ververica workshop repository about Stateful
functions are up and consistently running) and it is listening at the
provided address (http://127.0.0.1:8000/statefun), the Kotlin pipeline
(above) fails with a type cast error, which occurs before actually
calling the remote function, at line 90 of the
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction. The
reported exception is:
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed. at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264) at
akka.dispatch.OnComplete.internal(Future.scala:261) at
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at
akka.actor.Actor.aroundReceive(Actor.scala:517) at
akka.actor.Actor.aroundReceive$(Actor.scala:515) at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
akka.actor.ActorCell.invoke(ActorCell.scala:561) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by:
org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
An error occurred when attempting to invoke function
FunctionType(com.me.try, echo_user_id). at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
at
org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73)
at
org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50)
at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61)
at
org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:161)
at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:146)
at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at
java.base/java.lang.Thread.run(Thread.java:834) Caused by:
java.lang.ClassCastException: class com.lambda.User cannot be cast to
class org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
(com.lambda.User and
org.apache.flink.statefun.sdk.reqreply.generated.TypedValue are in
unnamed module of loader 'app') at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:90)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
... 24 more Process finished with exit code 1
According to the above exception, it appears that the object being sent
is a plain User, which is not a TypedValue, while I was expecting that
the first map operator used to define the userIngress stream should
suffice to correctly set-up the data to be sent to the stateful
function. Can you spot something I am doing wrong?
Waiting for a kind reply,
Best regards
Dario Bonino
--
Ing. Dario Bonino, Ph.D
e-m@il: [email protected]
www: https://www.linkedin.com/in/dariobonino
<foaf:Person>
<foaf:firstName>Dario</foaf:firstName>
<foaf:surname>Bonino</foaf:surname>
<foaf:msnChatID>[email protected]</foaf:msnChatID>
</foaf:Person>