Hello!

I'm getting an exception running a modified version of datastream/statefun
example. (See exception details that follow.) The example was adapted from
the original datastream example provided in statefun repo. I was trying to
play with the example by chaining two functions (with the 1st function
simulating async IO function invocation). However I'm getting an exception
saying that the mailbox is closed before the operation finishes. I'm adding
the source code of the example here
<https://gist.github.com/flint-stone/cbc60f2d41507fdf33507ba998696134>
(Please ignore the printing statement and variable usage not making any
sense since I'm debugging). The example works without chaining the second
function or without using AsyncIO in any of the functions. Any ideas why
this happens? Any suggestions would be appreciated.


Thanks!

Le

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 885ad02322104a6e36b011fadc704d2a)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 885ad02322104a6e36b011fadc704d2a)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1682)
at
org.apache.flink.statefun.examples.datastream.Example.main(Example.java:142)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 885ad02322104a6e36b011fadc704d2a)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.RejectedExecutionException:
java.lang.IllegalStateException: Mailbox is in state CLOSED, but is
required to be in state OPEN for put operations.
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:71)
at
org.apache.flink.streaming.api.operators.MailboxExecutor.execute(MailboxExecutor.java:94)
at
org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade.execute(MailboxExecutorFacade.java:35)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel$ConsumerTask.scheduleDrainAll(FeedbackChannel.java:114)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.put(FeedbackChannel.java:64)
at
org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator.processElement(FeedbackSinkOperator.java:58)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Mailbox is in state CLOSED, but
is required to be in state OPEN for put operations.
at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:265)
at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:193)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:69)
... 16 more

Reply via email to