Re: Failure Restart Strategy leads to error
Hi Yun and Oran, Thanks for your time. Much appreciated! Below are my configs: val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.enableCheckpointing(2000) //env.setDefaultSavepointDirectory("file:home/siddhesh/Desktop/savepoints/") env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.setStateBackend(new FsStateBackend(("file:home/siddhesh/Desktop/flink/"))) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written. env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly /*env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delay ))*/ Consumer properties: val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), getProperties()) // Setting up the consumer properties def getProperties(): Properties = { val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("isolation.level", "read_committed") return properties } Also, you can see that I have commented out setting up my own savepoint directory. It was also leading to some error and was causing to end abruptly. Do I need to set it up while running via CLI or is there is something I am missing for failure restart strategy and savepoints directory? Thanks, Sid On Wed, Jan 26, 2022 at 1:52 PM Yun Tang wrote: > Hi Siddhesh, > > The root cause is that the configuration of group.id is missing for the > Flink program. The configuration of restart strategy has no relationship > with this. > > I think you should pay your attention to kafka related configurations. > > > Best > Yun Tang > -- > *From:* Siddhesh Kalgaonkar > *Sent:* Wednesday, January 26, 2022 3:17 > *To:* user > *Subject:* Failure Restart Strategy leads to error > > I have Flink Kafka Consumer in place which works fine until I add the > below lines: > > env.setRestartStrategy(RestartStrategies.failureRateRestart( > 3, > *// max failures per unit *Time.of(5, TimeUnit.MINUTES), > *//time interval for measuring failure rate *Time.of(10, TimeUnit.SECONDS) > *// delay *)) > > It gives me the below error stack trace: > > DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) - > Close ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642. > org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink > Kafka Example(b425ae91bfb0e81980b878b3e4392137). > at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400) > at > org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > DEBUG [flink-akka.actor.default-dispatcher-12] > (DefaultJobLeaderIdService.java:148) - Remove job > b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring. > INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047) > - Disconnect job manager > a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3 >
Re: Failure Restart Strategy leads to error
Hi Siddhesh, The root cause is that the configuration of group.id is missing for the Flink program. The configuration of restart strategy has no relationship with this. I think you should pay your attention to kafka related configurations. Best Yun Tang From: Siddhesh Kalgaonkar Sent: Wednesday, January 26, 2022 3:17 To: user Subject: Failure Restart Strategy leads to error I have Flink Kafka Consumer in place which works fine until I add the below lines: env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay )) It gives me the below error stack trace: DEBUG [flink-akka.actor.default-dispatcher-14] (JobMaster.java:1119) - Close ResourceManager connection 05d80aa9f3aca06faf7be80bbc8a0642. org.apache.flink.util.FlinkException: Stopping JobMaster for job Flink Kafka Example(b425ae91bfb0e81980b878b3e4392137). at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:400) at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultJobLeaderIdService.java:148) - Remove job b425ae91bfb0e81980b878b3e4392137 from job leader id monitoring. INFO [flink-akka.actor.default-dispatcher-12] (ResourceManager.java:1047) - Disconnect job manager a95c280817468866d08c3230ecd0462f@akka://flink/user/rpc/jobmanager_3 for job b425ae91bfb0e81980b878b3e4392137 from the resource manager. DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultResourceTracker.java:80) - Initiating tracking of resources for job b425ae91bfb0e81980b878b3e4392137. DEBUG [flink-akka.actor.default-dispatcher-12] (DefaultResourceTracker.java:60) - Stopping tracking of resources for job b425ae91bfb0e81980b878b3e4392137. DEBUG [flink-akka.actor.default-dispatcher-14] (AkkaRpcActor.java:131) - The RpcEndpoint jobmanager_3 terminated successfully. INFO [flink-akka.actor.default-dispatcher-8] (DefaultJobLeaderService.java:136) - Stop job leader service. INFO [flink-akka.actor.default-dispatcher-8] (TaskExecutorLocalStateStoresManager.java:231) - Shutting down TaskExecutorLocalStateStoresManager. DEBUG [flink-akka.actor.default-dispatcher-8] (IOManagerAsync.java:121) - Shutting down I/O manager. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at