Hello, I am running Flink job which reads topics from Kafka and write results to Redis. I use FsStatebackend with HDFS.
I noticed that taking checkpoint takes serveral minutes and sometimes expires. --- 2016-06-14 17:25:40,734 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1456 (in 257956 ms) 2016-06-14 17:25:40,735 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1457 @ 1465892740734 2016-06-14 17:35:40,735 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1457 expired before completing. 2016-06-14 17:35:40,736 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1458 @ 1465893340735 2016-06-14 17:45:40,736 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1458 expired before completing. 2016-06-14 17:45:40,737 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1459 @ 1465893940736 2016-06-14 17:55:40,738 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1459 expired before completing. 2016-06-14 17:55:40,739 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1460 @ 1465894540738 --- According to WebUI, checkpoint size is just 1MB. Why checkpointing takes so long? I took jstack during checkpointing. It looks that checkpointing thread is blocked in commitOffsets. ---- "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry [0x00007f2b3ddfc000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392) - waiting to lock <0x0000000659111b58> (a org.apache.kafka.clients.consumer.KafkaConsumer) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596) - locked <0x0000000659111cc8> (a java.lang.Object) at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) --- Blocker is this. --- "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable [0x00007f2b3dbfa000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2) - locked <0x0000000659457db8> (a java.util.Collections$UnmodifiableSet) - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at org.apache.kafka.common.network.Selector.select(Selector.java:425) at org.apache.kafka.common.network.Selector.poll(Selector.java:254) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449) - locked <0x0000000659111b58> (a org.apache.kafka.clients.consumer.KafkaConsumer) --- If someone could advise me of the cause or the way to investigate further, that would be appreciated. Thanks, Hironori