Hi,

I've looked at this issue already at the Flink list and recommended Hironori
to post here. It seems that the consumer is not returning from the poll()
call, that's why the commitOffsets() method can not enter the synchronized
block.
The KafkaConsumer is logging the following statements:

2016-06-10 20:29:53,677 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
2016-06-10 20:29:53,678 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
2016-06-10 20:29:53,679 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
....
2016-06-10 20:56:53,982 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.


I guess that the poll() call is not returning within the given timeout
while trying to reconnect to the brokers?


On Wed, Jun 15, 2016 at 2:41 PM, Hironori Ogibayashi <ogibaya...@gmail.com>
wrote:

> Hello,
>
> I am running stream processing job with Kafka and Flink.
> Flink reads records from Kafka.
>
> My software versions are:
> - Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
> - Kafka client library: 0.9.0.1
> - Flink: 1.0.3
>
> Now I have problem that Flink job is sometimes blocked and consumer lag
> is increasing.
> I got thread dump during the situation.
>
> This is the blocked thread. Looks like blocked in
> KafkaConsumer.commitOffsets.
>
> ----
> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
> [0x00007f2b3ddfc000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>         - waiting to lock <0x0000000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>         - locked <0x0000000659111cc8> (a java.lang.Object)
>         at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> ---
>
> And lock 0x0000000659111b58 is held by the following thread.
>
> ---
> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
> [0x00007f2b3dbfa000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>         - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>         - locked <0x0000000659457db8> (a
> java.util.Collections$UnmodifiableSet)
>         - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>         at
> org.apache.kafka.common.network.Selector.select(Selector.java:425)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>         at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>         - locked <0x0000000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> ---
>
> I am wondering why Flink's kafka consumer is blocked and any advice
> would be appreciated.
>
> Thanks,
> Hironori Ogibayashi
>

Reply via email to