Hi, Sorry. This is the correct one.
Best Regards, Tony Wei 2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920...@gmail.com>: > Hi Stefan, > > Sorry for providing partial information. The attachment is the full logs > for checkpoint #1577. > > Why I would say it seems that asynchronous part was not executed > immediately is due to all synchronous parts were all finished at 2017-09-27 > 13:49. > Did that mean the checkpoint barrier event had already arrived at the > operator and started as soon as when the JM triggered the checkpoint? > > Best Regards, > Tony Wei > > 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>: > >> Hi, >> >> I agree that the memory consumption looks good. If there is only one TM, >> it will run inside one JVM. As for the 7 minutes, you mean the reported >> end-to-end time? This time measurement starts when the checkpoint is >> triggered on the job manager, the first contributor is then the time that >> it takes for the checkpoint barrier event to travel with the stream to the >> operators. If there is back pressure and a lot of events are buffered, this >> can introduce delay to this first part, because barriers must not overtake >> data for correctness. After the barrier arrives at the operator, next comes >> the synchronous part of the checkpoint, which is typically short running >> and takes a snapshot of the state (think of creating an immutable version, >> e.g. through copy on write). In the asynchronous part, this snapshot is >> persisted to DFS. After that the timing stops and is reported together with >> the acknowledgement to the job manager. >> >> So, I would assume if reporting took 7 minutes end-to-end, and the async >> part took 4 minutes, it is likely that it took around 3 minutes for the >> barrier event to travel with the stream. About the debugging, I think it is >> hard to figure out what is going on with the DFS if you don’t have metrics >> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where >> time is spend for the snapshotting? >> >> I am also looping in Stephan, he might have more suggestions. >> >> Best, >> Stefan >> >> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920...@gmail.com>: >> >> Hi Stefan, >> >> These are some telemetry information, but I don't have history >> information about gc. >> >> <???? 2017-09-2 8 下午4.51.26.png> >> <???? 2017-09-2 8 下午4.51.11.png> >> >> 1) Yes, my state is not large. >> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. >> Since this is a POC, we might move to AWS in the future or use HDFS in the >> same cluster. However, how can I recognize the problem is this. >> 3) It seems memory usage is bounded. I'm not sure if the status showed >> above is fine. >> >> There is only one TM in my cluster for now, so all tasks are running on >> that machine. I think that means they are in the same JVM, right? >> Besides taking so long on asynchronous part, there is another question is >> that the late message showed that this task was delay for almost 7 minutes, >> but the log showed it only took 4 minutes. >> It seems that it was somehow waiting for being executed. Are there some >> points to find out what happened? >> >> For the log information, what I means is it is hard to recognize which >> checkpoint id that asynchronous parts belong to if the checkpoint takes >> more time and there are more concurrent checkpoints taking place. >> Also, it seems that asynchronous part might be executed right away if >> there is no resource from thread pool. It is better to measure the time >> between creation time and processing time, and log it and checkpoint id >> with the original log that showed what time the asynchronous part took. >> >> Best Regards, >> Tony Wei >> >> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>: >> >>> Hi, >>> >>> when the async part takes that long I would have 3 things to look at: >>> >>> 1) Is your state so large? I don’t think this applies in your case, >>> right? >>> 2) Is something wrong with writing to DFS (network, disks, etc)? >>> 3) Are we running low on memory on that task manager? >>> >>> Do you have telemetry information about used heap and gc pressure on the >>> problematic task? However, what speaks against the memory problem >>> hypothesis is that future checkpoints seem to go through again. What I find >>> very strange is that within the reported 4 minutes of the async part the >>> only thing that happens is: open dfs output stream, iterate the in-memory >>> state and write serialized state data to dfs stream, then close the stream. >>> No locks or waits in that section, so I would assume that for one of the >>> three reasons I gave, writing the state is terribly slow. >>> >>> Those snapshots should be able to run concurrently, for example so that >>> users can also take savepoints even when a checkpoint was triggered and is >>> still running, so there is no way to guarantee that the previous parts have >>> finished, this is expected behaviour. Which waiting times are you missing >>> in the log? I think the information about when a checkpoint is triggered, >>> received by the TM, performing the sync and async part and acknowledgement >>> time should all be there?. >>> >>> Best, >>> Stefan >>> >>> >>> >>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920...@gmail.com>: >>> >>> Hi Stefan, >>> >>> The checkpoint on my job has been subsumed again. There are some >>> questions that I don't understand. >>> >>> Log in JM : >>> 2017-09-27 13:45:15,686 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms). >>> 2017-09-27 13:49:42,795 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>> - Triggering checkpoint 1577 @ 1506520182795 >>> 2017-09-27 13:54:42,795 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>> - Triggering checkpoint 1578 @ 1506520482795 >>> 2017-09-27 13:55:13,105 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms). >>> 2017-09-27 13:56:37,103 WARN >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>> - Received late message for now expired checkpoint attempt 1577 from >>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b.... >>> 2017-09-27 13:59:42,795 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>> - Triggering checkpoint 1579 @ 1506520782795 >>> >>> Log in TM: >>> 2017-09-27 13:56:37,105 INFO >>> org.apache.flink.runtime.state.DefaultOperatorStateBackend >>> - DefaultOperatorStateBackend snapshot (File Stream Factory @ >>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2, >>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task >>> Threads] took 240248 ms. >>> >>> I think the log in TM might be the late message for #1577 in JM, because >>> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37. >>> If there is no mistake on my words, I am wondering why the time it took >>> was 240248 ms (4 min). It seems that it started late than asynchronous >>> tasks in #1578. >>> Is there any way to guarantee the previous asynchronous parts of >>> checkpoints will be executed before the following. >>> >>> Moreover, I think it will be better to have more information in INFO >>> log, such as waiting time and checkpoint id, in order to trace the progress >>> of checkpoint conveniently. >>> >>> What do you think? Do you have any suggestion for me to deal with these >>> problems? Thank you. >>> >>> Best Regards, >>> Tony Wei >>> >>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920...@gmail.com>: >>> >>>> Hi Stefan, >>>> >>>> Here is the summary for my streaming job's checkpoint after restarting >>>> at last night. >>>> >>>> <???? 2017-09-2 7 下午4.56.30.png> >>>> >>>> This is the distribution of alignment buffered from the last 12 hours. >>>> >>>> <???? 2017-09-2 7 下午5.05.11.png> >>>> >>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk >>>> #1245 and #1246, you can check the picture I sent before. >>>> >>>> <???? 2017-09-2 7 下午5.01.24.png> >>>> >>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes >>>> up to HIGH, and always OK during the night. >>>> >>>> Best Regards, >>>> Tony Wei >>>> >>>> >>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com> >>>> : >>>> >>>>> Hi Tony, >>>>> >>>>> are your checkpoints typically close to the timeout boundary? From >>>>> what I see, writing the checkpoint is relatively fast but the time from >>>>> the >>>>> checkpoint trigger to execution seems very long. This is typically the >>>>> case >>>>> if your job has a lot of backpressure and therefore the checkpoint >>>>> barriers >>>>> take a long time to travel to the operators, because a lot of events are >>>>> piling up in the buffers. Do you also experience large alignments for your >>>>> checkpoints? >>>>> >>>>> Best, >>>>> Stefan >>>>> >>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920...@gmail.com>: >>>>> >>>>> Hi Stefan, >>>>> >>>>> It seems that I found something strange from JM's log. >>>>> >>>>> It had happened more than once before, but all subtasks would finish >>>>> their checkpoint attempts in the end. >>>>> >>>>> 2017-09-26 01:23:28,690 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Triggering checkpoint 1140 @ 1506389008690 >>>>> 2017-09-26 01:28:28,690 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Triggering checkpoint 1141 @ 1506389308690 >>>>> 2017-09-26 01:33:28,690 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Triggering checkpoint 1142 @ 1506389608690 >>>>> 2017-09-26 01:33:28,691 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Checkpoint 1140 expired before completing. >>>>> 2017-09-26 01:38:28,691 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Checkpoint 1141 expired before completing. >>>>> 2017-09-26 01:40:38,044 WARN >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Received late message for now expired checkpoint attempt 1140 from >>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b... >>>>> 2017-09-26 01:40:53,743 WARN >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Received late message for now expired checkpoint attempt 1141 from >>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b... >>>>> 2017-09-26 01:41:19,332 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms). >>>>> >>>>> For chk #1245 and #1246, there was no late message from TM. You can >>>>> refer to the TM log. The full completed checkpoint attempt will have 12 >>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10 >>>>> logs. >>>>> >>>>> 2017-09-26 10:08:28,690 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Triggering checkpoint 1245 @ 1506420508690 >>>>> 2017-09-26 10:13:28,690 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Triggering checkpoint 1246 @ 1506420808690 >>>>> 2017-09-26 10:18:28,691 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Checkpoint 1245 expired before completing. >>>>> 2017-09-26 10:23:28,691 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>> - Checkpoint 1246 expired before completing. >>>>> >>>>> Moreover, I listed the directory for checkpoints on S3 and saw there >>>>> were two states not discarded successfully. In general, there will be 16 >>>>> parts for a completed checkpoint state. >>>>> >>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0 >>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf >>>>> 0b-11cc1fc67ab8 >>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0 >>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-85 >>>>> 09-5fea4ed25af6 >>>>> >>>>> Hope these informations are helpful. Thank you. >>>>> >>>>> Best Regards, >>>>> Tony Wei >>>>> >>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com >>>>> >: >>>>> >>>>>> Hi, >>>>>> >>>>>> thanks for the information. Unfortunately, I have no immediate idea >>>>>> what the reason is from the given information. I think most helpful could >>>>>> be a thread dump, but also metrics on the operator operator level to >>>>>> figure >>>>>> out which part of the pipeline is the culprit. >>>>>> >>>>>> Best, >>>>>> Stefan >>>>>> >>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920...@gmail.com>: >>>>>> >>>>>> Hi Stefan, >>>>>> >>>>>> There is no unknown exception in my full log. The Flink version is >>>>>> 1.3.2. >>>>>> My job is roughly like this. >>>>>> >>>>>> env.addSource(Kafka) >>>>>> .map(ParseKeyFromRecord) >>>>>> .keyBy() >>>>>> .process(CountAndTimeoutWindow) >>>>>> .asyncIO(UploadToS3) >>>>>> .addSink(UpdateDatabase) >>>>>> >>>>>> It seemed all tasks stopped like the picture I sent in the last email. >>>>>> >>>>>> I will keep my eye on taking a thread dump from that JVM if this >>>>>> happens again. >>>>>> >>>>>> Best Regards, >>>>>> Tony Wei >>>>>> >>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter < >>>>>> s.rich...@data-artisans.com>: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> that is very strange indeed. I had a look at the logs and there is >>>>>>> no error or exception reported. I assume there is also no exception in >>>>>>> your >>>>>>> full logs? Which version of flink are you using and what operators were >>>>>>> running in the task that stopped? If this happens again, would it be >>>>>>> possible to take a thread dump from that JVM? >>>>>>> >>>>>>> Best, >>>>>>> Stefan >>>>>>> >>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920...@gmail.com>: >>>>>>> > >>>>>>> > Hi, >>>>>>> > >>>>>>> > Something weird happened on my streaming job. >>>>>>> > >>>>>>> > I found my streaming job seems to be blocked for a long time and I >>>>>>> saw the situation like the picture below. (chk #1245 and #1246 were all >>>>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed >>>>>>> with the same state like #1247 util I restarted TM.) >>>>>>> > >>>>>>> > <snapshot.png> >>>>>>> > >>>>>>> > I'm not sure what happened, but the consumer stopped fetching >>>>>>> records, buffer usage is 100% and the following task did not seem to >>>>>>> fetch >>>>>>> data anymore. Just like the whole TM was stopped. >>>>>>> > >>>>>>> > However, after I restarted TM and force the job restarting from >>>>>>> the latest completed checkpoint, everything worked again. And I don't >>>>>>> know >>>>>>> how to reproduce it. >>>>>>> > >>>>>>> > The attachment is my TM log. Because there are many user logs and >>>>>>> sensitive information, I only remain the log from `org.apache.flink...`. >>>>>>> > >>>>>>> > My cluster setting is one JM and one TM with 4 available slots. >>>>>>> > >>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and >>>>>>> max concurrent number is 3. >>>>>>> > >>>>>>> > Please let me know if it needs more information to find out what >>>>>>> happened on my streaming job. Thanks for your help. >>>>>>> > >>>>>>> > Best Regards, >>>>>>> > Tony Wei >>>>>>> > <flink-root-taskmanager-0-partial.log> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >> >
chk_ 1577.log
Description: Binary data