Hi After modification, my program run for 3 days without problem. Maximum checkpoint duration was 6 seconds. (Before modification, it took several minutes) I think the issue was fixed with it.
Regards, Hionori 2016-06-15 22:51 GMT+09:00 Robert Metzger <rmetz...@apache.org>: > Hi, > > I've looked at this issue already at the Flink list and recommended Hironori > to post here. It seems that the consumer is not returning from the poll() > call, that's why the commitOffsets() method can not enter the synchronized > block. > The KafkaConsumer is logging the following statements: > > 2016-06-10 20:29:53,677 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator 2147482645 dead. > 2016-06-10 20:29:53,678 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator 2147482645 dead. > 2016-06-10 20:29:53,679 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator 2147482645 dead. > .... > 2016-06-10 20:56:53,982 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator 2147482645 dead. > > > I guess that the poll() call is not returning within the given timeout > while trying to reconnect to the brokers? > > > On Wed, Jun 15, 2016 at 2:41 PM, Hironori Ogibayashi <ogibaya...@gmail.com> > wrote: > >> Hello, >> >> I am running stream processing job with Kafka and Flink. >> Flink reads records from Kafka. >> >> My software versions are: >> - Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version) >> - Kafka client library: 0.9.0.1 >> - Flink: 1.0.3 >> >> Now I have problem that Flink job is sometimes blocked and consumer lag >> is increasing. >> I got thread dump during the situation. >> >> This is the blocked thread. Looks like blocked in >> KafkaConsumer.commitOffsets. >> >> ---- >> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon >> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry >> [0x00007f2b3ddfc000] >> java.lang.Thread.State: BLOCKED (on object monitor) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392) >> - waiting to lock <0x0000000659111b58> (a >> org.apache.kafka.clients.consumer.KafkaConsumer) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596) >> - locked <0x0000000659111cc8> (a java.lang.Object) >> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> --- >> >> And lock 0x0000000659111b58 is held by the following thread. >> >> --- >> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable >> [0x00007f2b3dbfa000] >> java.lang.Thread.State: RUNNABLE >> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) >> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) >> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) >> - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2) >> - locked <0x0000000659457db8> (a >> java.util.Collections$UnmodifiableSet) >> - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl) >> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) >> at >> org.apache.kafka.common.network.Selector.select(Selector.java:425) >> at org.apache.kafka.common.network.Selector.poll(Selector.java:254) >> at >> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449) >> - locked <0x0000000659111b58> (a >> org.apache.kafka.clients.consumer.KafkaConsumer) >> --- >> >> I am wondering why Flink's kafka consumer is blocked and any advice >> would be appreciated. >> >> Thanks, >> Hironori Ogibayashi >>