You should try and find out why everything is just happening on one node. Have you looked at your Kafka Partitions, i.e. is the data evenly distributed across the partitions of your Kafka topic or is all data pushed to one partition? This would actually explain why processing is only happening one one node, namely the node that is reading the partition that has all the data.
On Thu, 3 Nov 2016 at 20:53 amir bahmanyari <[email protected]> wrote: > Thanks Aljoscha. > I have been tuning Flink memory, NW buffers etc. > And this occurred in THAT ONE NODE that I see *.out logs get created by > Flink. > I lowered the memory that Flink allocates i.e. 70% by default to 50%. > And this exception was thrown in that one node only. Other nodes were up & > didnt crash. > There is SOMETHING different about THAT ONE NODE :-) I cannot figure it > out. > At every ./start-cluster, THAT ONE NODE may/may not change on random basis. > So I cannt just tune THAT ONE NODE. Next time, another node may become > THAT ONE NODE. > > I have the followings set in flink-conf.yaml in each node: > > akka.ask.timeout : 300s > jobmanager.heap.mb: 256 //Could this be too small? > taskmanager.heap.mb: 102400 > taskmanager.memory.fraction: 0.6 //Changing this to a lower value causes > the exception below. Am testing with 0.6 <0.7 default. > taskmanager.numberOfTaskSlots: 512 > taskmanager.memory.preallocate: false > parallelism.default: 2048 > taskmanager.network.numberOfBuffers: 131072 > > > Appreciate any feedback. > Amir- > ------------------------------ > *From:* Aljoscha Krettek <[email protected]> > *To:* amir bahmanyari <[email protected]>; " > [email protected]" <[email protected]> > *Sent:* Thursday, November 3, 2016 12:45 AM > *Subject:* Re: What does this exception mean to you? > > That looks like a Flink problem. The TaskManager on beam4 seems to have > crashed for some reason. You might be able to find that reason by looking > at the logs on that machine. > > On Thu, 3 Nov 2016 at 04:53 amir bahmanyari <[email protected]> wrote: > > Thanks+regards, > beam4 and beam1 are hostnames. > BenchBeamRunners.java is my Beam app running in a four servers > FlinkCluster. > Other nodes are still running except the one that failed beam4. > beam1 has the JM running. > > Amir- > > java.lang.RuntimeException: Pipeline execution failed > at > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) > at > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183) > at > benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.Client.runBlocking(Client.java:381) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:355) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118) > at > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110) > ... 14 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The slot in which the task was executed > has been released. Probably loss of TaskManager > 06dff71ba6ab965ec323c8ee6bf3d7d1 @ beam4 - 512 slots - URL: akka.tcp:// > [email protected]:44399/user/taskmanager > at > org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) > at > org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) > at > org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) > at > org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) > at > org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at > akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) > at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) > at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) > at akka.actor.ActorCell.invoke(ActorCell.scala:486) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > ... 2 more > > > >
