Hi, Did you check the metrics for the garbage collector? Stuck with high CPU consumption and lots of timers sound like there could be a possible problem, because timer are currently on-heap objects, but we are working on RocksDB-based timers right now.
Best, Stefan > Am 12.07.2018 um 14:54 schrieb shishal singh <shisha...@gmail.com>: > > Thanks Stefan/Stephan/Nico, > > Indeed there are 2 problem. For the 2nd problem ,I am almost certain that > explanation given by Stephan is the true as in my case as there number of > timers are in millions. (Each for different key so I guess coalescing is not > an option for me). > > If I simplify my problem, each day I receive millions of events (10-20M) and > I have to schedule a timer for next day 8 AM to check if matching events are > there , if not I have to send it to Elastic sink as Alert. I suspected that > having so many timers fires at same time could cause my jobs to hang, so I am > now scheduling times randomly between (8AM-to 10AM). But still my job gets > hang after some time. One more thing which I noticed that when my job gets > hang CPU utilization shoot to almost 100%. > I tried to isolate problem by removing ES sink and just did stream.print() > and yet problem persist. > > In my current setup, I am running a standalone cluster of 3 machine (All > three server has Task manger, Job manager and Hadoop on it). So I am not > using EBS for rocksDB. > > Also I verified that when jobs gets hang even timers are not being called as > I have debug statement in Timers and only logs I see at that time are > following : > > 2018-07-12 14:35:30,423 DEBUG > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping > response for sessionid: 0x2648355f7c6010f after 11ms > 2018-07-12 14:35:31,957 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > 2018-07-12 14:35:36,946 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > 2018-07-12 14:35:41,963 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > 2018-07-12 14:35:43,775 DEBUG > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping > response for sessionid: 0x2648355f7c6010f after 10ms > 2018-07-12 14:35:46,946 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > 2018-07-12 14:35:51,954 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > 2018-07-12 14:35:56,967 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > 2018-07-12 14:35:57,127 DEBUG > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping > response for sessionid: 0x2648355f7c6010f after 8ms > 2018-07-12 14:36:01,944 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > 2018-07-12 14:36:06,955 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > 2018-07-12 14:36:08,287 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Receiver > TriggerCheckpoint 155@1531398968248 for d9af2f1da87b7268cc03e152a6179eae. > 2018-07-12 14:36:08,287 DEBUG org.apache.flink.runtime.taskmanager.Task > - Invoking async call Checkpoint Trigger for Source: Event > Source -> filter (1/1) (d9af2f1da87b7268cc03e152a6179eae). on task Source: > Event Source -> filter (1/1) > 2018-07-12 14:36:10,476 DEBUG > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got ping > response for sessionid: 0x2648355f7c6010f after 10ms > 2018-07-12 14:36:11,957 DEBUG > org.apache.flink.runtime.taskmanager.TaskManager - Sending > heartbeat to JobManager > > As I expected checkpoint also start to fail during this time. > > My Job Graph is pretty much simple : Source<Kafka>-->filter--<process with > times>--->Sink > > > Regards, > Shishal > > > On Thu, Jul 12, 2018 at 9:54 AM Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>> wrote: > Hi, > > adding to what has already been said, I think that here can be two orthogonal > problems here: i) why is your job slowing down/getting stuck? and ii) why is > cancellation blocked? As for ii) I think Stephan already gave to right reason > that shutdown could take longer and that is what gets the TM killed. > > A more interesting question could still be i), why is your job slowing down > until shutdown in the first place. I have two questions here.First, are you > running on RocksDB on EBS volumes, then please have a look at this thread [1] > because there can be some performance pitfalls. Second, how many timers are > you expecting, and how are they firing? For example, if you have a huge > amount of timers and the watermark makes a bug jump, there is a possibility > that it takes a while until the job makes progress because it has to handle > so many timer callbacks first. Metrics from even throughput and from your I/O > subsystem could be helpful to see if something is stuck/underperforming or if > there is just a lot of timer processing going on. > > Best, > Stefan > > [1] > https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3ccakhqddzamdqewiz5b1qndqv4+-mtvefhbhewrpxftlu7dv9...@mail.gmail.com%3E > > <https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3ccakhqddzamdqewiz5b1qndqv4+-mtvefhbhewrpxftlu7dv9...@mail.gmail.com%3E> > >> Am 11.07.2018 um 19:31 schrieb Nico Kruber <n...@data-artisans.com >> <mailto:n...@data-artisans.com>>: >> >> If this is about too many timers and your application allows it, you may >> also try to reduce the timer resolution and thus frequency by coalescing >> them [1]. >> >> >> Nico >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing> >> >> On 11/07/18 18:27, Stephan Ewen wrote: >>> Hi shishal! >>> >>> I think there is an issue with cancellation when many timers fire at the >>> same time. These timers have to finish before shutdown happens, this >>> seems to take a while in your case. >>> >>> Did the TM process actually kill itself in the end (and got restarted)? >>> >>> >>> >>> On Wed, Jul 11, 2018 at 9:29 AM, shishal <shisha...@gmail.com >>> <mailto:shisha...@gmail.com> >>> <mailto:shisha...@gmail.com <mailto:shisha...@gmail.com>>> wrote: >>> >>> Hi, >>> >>> I am using flink 1.4.2 with rocksdb as backend. I am using process >>> function >>> with timer on EventTime. For checkpointing I am using hdfs. >>> >>> I am trying load testing so Iam reading kafka from beginning (aprox >>> 7 days >>> data with 50M events). >>> >>> My job gets stuck after aprox 20 min with no error. There after >>> watermark do >>> not progress and all checkpoint fails. >>> >>> Also When I try to cancel my job (using web UI) , it takes several >>> minutes >>> to finally gets cancelled. Also it makes Task manager down as well. >>> >>> There is no logs while my job hanged but while cancelling I get >>> following >>> error. >>> >>> / >>> >>> 2018-07-11 09:10:39,385 ERROR >>> org.apache.flink.runtime.taskmanager.TaskManager - >>> ============================================================== >>> ====================== FATAL ======================= >>> ============================================================== >>> >>> A fatal error occurred, forcing the TaskManager to shut down: Task >>> 'process >>> (3/6)' did not react to cancelling signal in the last 30 seconds, but is >>> stuck in method: >>> org.rocksdb.RocksDB.get(Native Method) >>> org.rocksdb.RocksDB.get(RocksDB.java:810) >>> >>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102) >>> >>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) >>> >>> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) >>> >>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) >>> >>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) >>> >>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) >>> >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) >>> org.apache.flink.streaming.runtime.io >>> <http://org.apache.flink.streaming.runtime.io/> >>> <http://runtime.io >>> <http://runtime.io/>>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) >>> >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) >>> >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) >>> org.apache.flink.streaming.runtime.io >>> <http://org.apache.flink.streaming.runtime.io/> >>> <http://runtime.io >>> <http://runtime.io/>>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) >>> >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> java.lang.Thread.run(Thread.java:748) >>> >>> 2018-07-11 09:10:39,390 DEBUG >>> >>> org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy >>> >>> - Actor was killed. Stopping it now. >>> akka.actor.ActorKilledException: Kill >>> 2018-07-11 09:10:39,407 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Stopping >>> TaskManager akka://flink/user/taskmanager#-1231617791. >>> 2018-07-11 09:10:39,408 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - >>> Cancelling >>> all computations and discarding all cached data. >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Attempting to fail task externally process (3/6) >>> (432fd129f3eea363334521f8c8de5198). >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Task process (3/6) is already in state CANCELING >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Attempting to fail task externally process (4/6) >>> (7c6b96c9f32b067bdf8fa7c283eca2e0). >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Task process (4/6) is already in state CANCELING >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Attempting to fail task externally process (2/6) >>> (a4f731797a7ea210fd0b512b0263bcd9). >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Task process (2/6) is already in state CANCELING >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Attempting to fail task externally process (1/6) >>> (cd8a113779a4c00a051d78ad63bc7963). >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Task process (1/6) is already in state CANCELING >>> 2018-07-11 09:10:39,409 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - >>> Disassociating from JobManager >>> 2018-07-11 09:10:39,412 INFO >>> org.apache.flink.runtime.blob.PermanentBlobCache - Shutting >>> down BLOB cache >>> 2018-07-11 09:10:39,431 INFO >>> org.apache.flink.runtime.blob.TransientBlobCache - Shutting >>> down BLOB cache >>> 2018-07-11 09:10:39,444 INFO >>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService >>> - >>> Stopping ZooKeeperLeaderRetrievalService. >>> 2018-07-11 09:10:39,444 DEBUG >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.disk.iomanager.IOManager >>> - Shutting >>> down I/O manager. >>> 2018-07-11 09:10:39,451 INFO >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.disk.iomanager.IOManager >>> - I/O manager >>> removed spill file directory >>> /tmp/flink-io-989505e5-ac33-4d56-add5-04b2ad3067b4 >>> 2018-07-11 09:10:39,461 INFO >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.network.NetworkEnvironment >>> - Shutting >>> down the network environment and its components. >>> 2018-07-11 09:10:39,461 DEBUG >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.network.NetworkEnvironment >>> - Shutting >>> down network connection manager >>> 2018-07-11 09:10:39,462 INFO >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.network.netty.NettyClient >>> - Successful >>> shutdown (took 1 ms). >>> 2018-07-11 09:10:39,472 INFO >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.network.netty.NettyServer >>> - Successful >>> shutdown (took 10 ms). >>> 2018-07-11 09:10:39,472 DEBUG >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.network.NetworkEnvironment >>> - Shutting >>> down intermediate result partition manager >>> 2018-07-11 09:10:39,473 DEBUG >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.network.partition.ResultPartitionManager >>> >>> - >>> Releasing 0 partitions because of shutdown. >>> 2018-07-11 09:10:39,474 DEBUG >>> org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/> >>> <http://org.apache.flink.runtime.io >>> <http://org.apache.flink.runtime.io/>>.network.partition.ResultPartitionManager >>> >>> - >>> Successful shutdown. >>> 2018-07-11 09:10:39,498 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Task >>> manager >>> akka://flink/user/taskmanager is completely shut down. >>> 2018-07-11 09:10:39,504 ERROR >>> org.apache.flink.runtime.taskmanager.TaskManager - Actor >>> akka://flink/user/taskmanager#-1231617791 terminated, stopping >>> process... >>> 2018-07-11 09:10:39,563 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Notifying TaskManager about fatal error. Task 'process (2/6)' did not >>> react to cancelling signal in the last 30 seconds, but is stuck in >>> method: >>> org.rocksdb.RocksDB.get(Native Method) >>> org.rocksdb.RocksDB.get(RocksDB.java:810) >>> >>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102) >>> >>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) >>> >>> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) >>> >>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) >>> >>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) >>> >>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) >>> >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) >>> org.apache.flink.streaming.runtime.io >>> <http://org.apache.flink.streaming.runtime.io/> >>> <http://runtime.io >>> <http://runtime.io/>>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) >>> >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) >>> >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) >>> org.apache.flink.streaming.runtime.io >>> <http://org.apache.flink.streaming.runtime.io/> >>> <http://runtime.io >>> <http://runtime.io/>>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) >>> >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> java.lang.Thread.run(Thread.java:748) >>> . >>> 2018-07-11 09:10:39,575 INFO >>> org.apache.flink.runtime.taskmanager.Task >>> - Notifying TaskManager about fatal error. Task 'process (1/6)' did not >>> react to cancelling signal in the last 30 seconds, but is stuck in >>> method: >>> >>> >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>> java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>> java.lang.Class.newInstance(Class.java:442) >>> >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:196) >>> >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:399) >>> >>> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:304) >>> >>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:104) >>> >>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47) >>> >>> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94) >>> >>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75) >>> >>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) >>> >>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) >>> >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885) >>> org.apache.flink.streaming.runtime.io >>> <http://org.apache.flink.streaming.runtime.io/> >>> <http://runtime.io >>> <http://runtime.io/>>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) >>> >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) >>> >>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) >>> org.apache.flink.streaming.runtime.io >>> <http://org.apache.flink.streaming.runtime.io/> >>> <http://runtime.io >>> <http://runtime.io/>>.StreamInputProcessor.processInput(StreamInputProcessor.java:189) >>> >>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> java.lang.Thread.run(Thread.java:748) >>> / >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>> >>> >>> >> >> -- >> Nico Kruber | Software Engineer >> data Artisans >> >> Follow us @dataArtisans >> -- >> Join Flink Forward - The Apache Flink Conference >> Stream Processing | Event Driven | Real Time >> -- >> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany >> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA >> -- >> Data Artisans GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> >