Hi Igal:

Thanks for the suggestion -- I changed the implementation based on your
suggestion by attaching the second function right after the first one using
the same builder. The only difference is that except the first function
send to egress -- it now sends to the the second function and then the
second function sends to egress. However I'm getting a new error saying
that "java.lang.String cannot be cast to com.google.protobuf.Any". The full
trace is at the end. The strange things is that it tries to cast java
String to protobuf.Any. But in no where in the program that I used
protobuf  (I believe Kryo is the default option). Therefore I'm doubt that
I did not use context.send in the right way. I'm attaching the my code here
<https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a#file-example-java-L169>
(https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a ,
line 169).

Thanks!

Le

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by:
org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
An error occurred when attempting to invoke function FunctionType(example,
remote-greet2).
at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
at
org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:73)
at
org.apache.flink.statefun.flink.core.functions.FunctionActivation.applyNextPendingEnvelope(FunctionActivation.java:50)
at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:61)
at
org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:161)
at
org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:146)
at
org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:186)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
com.google.protobuf.Any
at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:90)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
... 24 more

On Sat, Dec 26, 2020 at 2:22 PM Igal Shilman <i...@ververica.com> wrote:

> Hi Le,
>
> You can attach many different functions in a single StateFun builder, and
> let them message each other.
> In your example, you can make the "Greet" function message Greet2 directly
> (in addition to emitting a message as an egress).
> Embedding multiple copies of StateFun within a Datastream application is
> currently not supported.
>
> Thanks,
> Igal.
>
>
> On Sat, Dec 26, 2020 at 6:22 AM Le Xu <sharonx...@gmail.com> wrote:
>
>> Hello!
>>
>> I'm trying to modify the DataStream API example
>> <https://github.com/apache/flink-statefun/blob/4fe04ea351145f989e144f160425424987050f68/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java>
>> provided by Flink Stateful Function by attaching another function and
>> creating a function chain. However, I'm getting the following error
>>
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Hash
>> collision on user-specified ID "feedback_union_uid1". Most likely cause is
>> a non-unique ID. Please check that all IDs specified via uid(String) are
>> unique. at
>> org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:178)
>> at
>> org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:109)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:165)
>> at
>> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:113)
>> at
>> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:850)
>> at
>> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
>> at
>> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
>> at
>> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:55)
>> at
>> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:98)
>> at
>> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:79)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1818)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1714)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
>> at
>> org.apache.flink.statefun.examples.datastream.Example.main(Example.java:161)
>>
>>
>> Here
>> <https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a>
>> is my modified example:
>>
>> https://gist.github.com/flint-stone/b896ea3422245cdca9bc7cc324be152a
>>
>>
>> I realize that I can only modify uid of the stage through DataStream API
>> but not StateFun API -- what is the best practice to avoid such error (or
>> there is a better way to chain stateful function in Flink)?
>>
>>
>> Thanks!
>>
>>
>> Le
>>
>>
>>
>>
>>

Reply via email to