Thanks Till. `taskmanager.network.request-backoff.max` option helped in my case. We tried this on 1.5.0 and jobs are running fine.
-- Thanks Amit On Thu 24 May, 2018, 4:58 PM Amit Jain, <aj201...@gmail.com> wrote: > Thanks! Till. I'll give a try on your suggestions and update the thread. > > On Wed, May 23, 2018 at 4:43 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > > Hi Amit, > > > > it looks as if the current cancellation cause is not the same as the > > initially reported cancellation cause. In the current case, it looks as > if > > the deployment of your tasks takes so long that that maximum > > `taskmanager.network.request-backoff.max` value has been reached. When > this > > happens a task gives up asking for the input result partition and fails > with > > the `PartitionNotFoundException`. > > > > More concretely, the `CHAIN Reduce (GroupReduce at > > first(SortedGrouping.java:210)) -> Map (Key Extractor) (2/14)` cannot > > retrieve the result partition of the `CHAIN DataSource (at > > createInput(ExecutionEnvironment.java:548) > > (org.apache.flink.api.java.io.TextInputFormat)) -> Map (Data Source > > org.apache.flink.api.java.io.TextInputFormat > > > [s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/14/, > > > s3a://limeroad-logs/mysqlbin_lradmin_s2d_order_data2/redshift_logs/2018/5/20/15/0/]) > > -> Map (Key Extractor) -> Combine (GroupReduce at > > first(SortedGrouping.java:210)) (8/14)` task. This tasks is in the state > > deploying when the exception occurs. It seems to me that this task takes > > quite some time to be deployed. > > > > One reason why the deployment could take some time is that an UDF (for > > example the closure) of one of the operators is quite large. If this is > the > > case, then Flink offloads the corresponding data onto the BlobServer from > > where they are retrieved by the TaskManagers. Since you are running in > > non-HA mode, the BlobServer won't store the blobs on HDFS from where they > > could be retrieved. Instead all the TaskManagers ask the single > BlobServer > > for the required TDD blobs. Depending on the size of the TDDs, the > > BlobServer might become the bottleneck. > > > > What you can try to do is the following > > 1) Try to reduce the closure object of the UDFs in the above-mentioned > task. > > 2) Increase `taskmanager.network.request-backoff.max` to give the system > > more time to download the blobs > > 3) Run the cluster in HA mode which will store the blobs also under > > `high-availability.storageDir` (usually HDFS or S3). Before downloading > the > > blobs from the BlobServer, Flink will first try to download them from the > > `high-availability-storageDir` > > > > Let me know if this solves your problem. > > > > Cheers, > > Till > > > > On Tue, May 22, 2018 at 1:29 PM, Amit Jain <aj201...@gmail.com> wrote: > >> > >> Hi Nico, > >> > >> Please find the attachment for more logs. > >> > >> -- > >> Thanks, > >> Amit > >> > >> On Tue, May 22, 2018 at 4:09 PM, Nico Kruber <n...@data-artisans.com> > >> wrote: > >> > Hi Amit, > >> > thanks for providing the logs, I'll look into it. We currently have a > >> > suspicion of this being caused by > >> > https://issues.apache.org/jira/browse/FLINK-9406 which we found by > >> > looking over the surrounding code. The RC4 has been cancelled since we > >> > see this as a release blocker. > >> > > >> > To rule out further errors, can you also provide logs for the task > >> > manager producing partitions d6946b39439f10e8189322becf1b8887, > >> > 9aa35dd53561f9220b7b1bad84309d5f and 60909dec9134eb980e18064358dc2c81? > >> > The task manager log you provided covers the task manager asking for > >> > this partition only for which the job manager produces the > >> > PartitionProducerDisposedException that you see. > >> > I'm looking for the logs of task managers with the following execution > >> > IDs in their logs: > >> > - 2826f9d430e05e9defaa46f60292fa79 > >> > - 7ef992a067881a07409819e3aea32004 > >> > - ec923ce6d891d89cf6fecb5e3f5b7cc5 > >> > > >> > Regarding the operators being stuck: I'll have a further look into the > >> > logs and state transition and will come back to you. > >> > > >> > > >> > Nico > >> > > >> > > >> > On 21/05/18 09:27, Amit Jain wrote: > >> >> Hi All, > >> >> > >> >> I totally missed this thread. I've encountered same issue in Flink > >> >> 1.5.0 RC4. Please look over the attached logs of JM and impacted TM. > >> >> > >> >> Job ID 390a96eaae733f8e2f12fc6c49b26b8b > >> >> > >> >> -- > >> >> Thanks, > >> >> Amit > >> >> > >> >> On Thu, May 3, 2018 at 8:31 PM, Nico Kruber <n...@data-artisans.com> > >> >> wrote: > >> >>> Also, please have a look at the other TaskManagers' logs, in > >> >>> particular > >> >>> the one that is running the operator that was mentioned in the > >> >>> exception. You should look out for the ID > >> >>> 98f5976716234236dc69fb0e82a0cc34. > >> >>> > >> >>> > >> >>> Nico > >> >>> > >> >>> > >> >>> PS: Flink logs files should compress quite nicely if they grow too > big > >> >>> :) > >> >>> > >> >>> On 03/05/18 14:07, Stephan Ewen wrote: > >> >>>> Google Drive would be great. > >> >>>> > >> >>>> Thanks! > >> >>>> > >> >>>> On Thu, May 3, 2018 at 1:33 PM, Amit Jain <aj201...@gmail.com > >> >>>> <mailto:aj201...@gmail.com>> wrote: > >> >>>> > >> >>>> Hi Stephan, > >> >>>> > >> >>>> Size of JM log file is 122 MB. Could you provide me other media > >> >>>> to > >> >>>> post the same? We can use Google Drive if that's fine with you. > >> >>>> > >> >>>> -- > >> >>>> Thanks, > >> >>>> Amit > >> >>>> > >> >>>> On Thu, May 3, 2018 at 12:58 PM, Stephan Ewen < > se...@apache.org > >> >>>> <mailto:se...@apache.org>> wrote: > >> >>>> > Hi Amit! > >> >>>> > > >> >>>> > Thanks for sharing this, this looks like a regression with > the > >> >>>> network stack > >> >>>> > changes. > >> >>>> > > >> >>>> > The log you shared from the TaskManager gives some hint, but > >> >>>> that > >> >>>> exception > >> >>>> > alone should not be a problem. That exception can occur > under a > >> >>>> race between > >> >>>> > deployment of some tasks while the whole job is entering a > >> >>>> recovery phase > >> >>>> > (maybe we should not print it so prominently to not confuse > >> >>>> users). There > >> >>>> > must be something else happening on the JobManager. Can you > >> >>>> share > >> >>>> the JM > >> >>>> > logs as well? > >> >>>> > > >> >>>> > Thanks a lot, > >> >>>> > Stephan > >> >>>> > > >> >>>> > > >> >>>> > On Wed, May 2, 2018 at 12:21 PM, Amit Jain < > aj201...@gmail.com > >> >>>> <mailto:aj201...@gmail.com>> wrote: > >> >>>> >> > >> >>>> >> Thanks! Fabian > >> >>>> >> > >> >>>> >> I will try using the current release-1.5 branch and update > >> >>>> this > >> >>>> thread. > >> >>>> >> > >> >>>> >> -- > >> >>>> >> Thanks, > >> >>>> >> Amit > >> >>>> >> > >> >>>> >> On Wed, May 2, 2018 at 3:42 PM, Fabian Hueske > >> >>>> <fhue...@gmail.com > >> >>>> <mailto:fhue...@gmail.com>> wrote: > >> >>>> >> > Hi Amit, > >> >>>> >> > > >> >>>> >> > We recently fixed a bug in the network stack that affected > >> >>>> batch jobs > >> >>>> >> > (FLINK-9144). > >> >>>> >> > The fix was added after your commit. > >> >>>> >> > > >> >>>> >> > Do you have a chance to build the current release-1.5 > branch > >> >>>> and check > >> >>>> >> > if > >> >>>> >> > the fix also resolves your problem? > >> >>>> >> > > >> >>>> >> > Otherwise it would be great if you could open a blocker > >> >>>> issue > >> >>>> for the > >> >>>> >> > 1.5 > >> >>>> >> > release to ensure that this is fixed. > >> >>>> >> > > >> >>>> >> > Thanks, > >> >>>> >> > Fabian > >> >>>> >> > > >> >>>> >> > 2018-04-29 18:30 GMT+02:00 Amit Jain <aj201...@gmail.com > >> >>>> <mailto:aj201...@gmail.com>>: > >> >>>> >> >> > >> >>>> >> >> Cluster is running on commit 2af481a > >> >>>> >> >> > >> >>>> >> >> On Sun, Apr 29, 2018 at 9:59 PM, Amit Jain > >> >>>> <aj201...@gmail.com > >> >>>> <mailto:aj201...@gmail.com>> wrote: > >> >>>> >> >> > Hi, > >> >>>> >> >> > > >> >>>> >> >> > We are running numbers of batch jobs in Flink 1.5 > cluster > >> >>>> and few of > >> >>>> >> >> > those > >> >>>> >> >> > are getting stuck at random. These jobs having the > >> >>>> following > >> >>>> failure > >> >>>> >> >> > after > >> >>>> >> >> > which operator status changes to CANCELED and stuck to > >> >>>> same. > >> >>>> >> >> > > >> >>>> >> >> > Please find complete TM's log at > >> >>>> >> >> > > >> >>>> > >> >>>> > https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012 > >> >>>> > >> >>>> < > https://gist.github.com/imamitjain/066d0e99990ee24f2da1ddc83eba2012> > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > 2018-04-29 14:57:24,437 INFO > >> >>>> >> >> > org.apache.flink.runtime.taskmanager.Task > >> >>>> >> >> > - Producer 98f5976716234236dc69fb0e82a0cc34 of > partition > >> >>>> >> >> > 5b8dd5ac2df14266080a8e049defbe38 disposed. Cancelling > >> >>>> execution. > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException: > >> >>>> >> >> > Execution 98f5976716234236dc69fb0e82a0cc34 producing > >> >>>> partition > >> >>>> >> >> > 5b8dd5ac2df14266080a8e049defbe38 has already been > >> >>>> disposed. > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > org.apache.flink.runtime.jobmaster.JobMaster.requestPartitionState(JobMaster.java:610) > >> >>>> >> >> > at > sun.reflect.GeneratedMethodAccessor107.invoke(Unknown > >> >>>> Source) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> >>>> >> >> > at java.lang.reflect.Method.invoke(Method.java:498) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:69) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > >> >>>> >> >> > at > akka.actor.Actor$class.aroundReceive(Actor.scala:502) > >> >>>> >> >> > at > >> >>>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > >> >>>> >> >> > at > >> >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > >> >>>> >> >> > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > >> >>>> >> >> > at > >> >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > >> >>>> >> >> > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > >> >>>> >> >> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> > >> >>>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> >>>> >> >> > at > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> > >> >>>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> >>>> >> >> > > >> >>>> >> >> > > >> >>>> >> >> > Thanks > >> >>>> >> >> > Amit > >> >>>> >> > > >> >>>> >> > > >> >>>> > > >> >>>> > > >> >>>> > >> >>>> > >> >>> > >> >>> -- > >> >>> Nico Kruber | Software Engineer > >> >>> data Artisans > >> >>> > >> >>> Follow us @dataArtisans > >> >>> -- > >> >>> Join Flink Forward - The Apache Flink Conference > >> >>> Stream Processing | Event Driven | Real Time > >> >>> -- > >> >>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany > >> >>> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, > >> >>> USA > >> >>> -- > >> >>> Data Artisans GmbH > >> >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B > >> >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > >> >>> > >> > > >> > -- > >> > Nico Kruber | Software Engineer > >> > data Artisans > >> > > >> > Follow us @dataArtisans > >> > -- > >> > Join Flink Forward - The Apache Flink Conference > >> > Stream Processing | Event Driven | Real Time > >> > -- > >> > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany > >> > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, > USA > >> > -- > >> > Data Artisans GmbH > >> > Registered at Amtsgericht Charlottenburg: HRB 158244 B > >> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > >> > > > > > >