Re: Blocked in KafkaConsumer.commitOffsets

2016-07-11 Thread Hironori Ogibayashi
Hi

After modification, my program run for 3 days without problem. Maximum
checkpoint
duration was 6 seconds. (Before modification, it took several minutes)
I think the issue was fixed with it.

Regards,
Hionori


2016-06-15 22:51 GMT+09:00 Robert Metzger :
> Hi,
>
> I've looked at this issue already at the Flink list and recommended Hironori
> to post here. It seems that the consumer is not returning from the poll()
> call, that's why the commitOffsets() method can not enter the synchronized
> block.
> The KafkaConsumer is logging the following statements:
>
> 2016-06-10 20:29:53,677 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147482645 dead.
> 2016-06-10 20:29:53,678 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147482645 dead.
> 2016-06-10 20:29:53,679 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147482645 dead.
> 
> 2016-06-10 20:56:53,982 INFO
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator 2147482645 dead.
>
>
> I guess that the poll() call is not returning within the given timeout
> while trying to reconnect to the brokers?
>
>
> On Wed, Jun 15, 2016 at 2:41 PM, Hironori Ogibayashi 
> wrote:
>
>> Hello,
>>
>> I am running stream processing job with Kafka and Flink.
>> Flink reads records from Kafka.
>>
>> My software versions are:
>> - Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
>> - Kafka client library: 0.9.0.1
>> - Flink: 1.0.3
>>
>> Now I have problem that Flink job is sometimes blocked and consumer lag
>> is increasing.
>> I got thread dump during the situation.
>>
>> This is the blocked thread. Looks like blocked in
>> KafkaConsumer.commitOffsets.
>>
>> 
>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>> prio=10 tid=0x7f2b14010800 nid=0x1b89a waiting for monitor entry
>> [0x7f2b3ddfc000]
>>java.lang.Thread.State: BLOCKED (on object monitor)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>> - waiting to lock <0x000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>> - locked <0x000659111cc8> (a java.lang.Object)
>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> ---
>>
>> And lock 0x000659111b58 is held by the following thread.
>>
>> ---
>> "Thread-9" daemon prio=10 tid=0x7f2b2440d000 nid=0x1b838 runnable
>> [0x7f2b3dbfa000]
>>java.lang.Thread.State: RUNNABLE
>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>> - locked <0x000659457dc8> (a sun.nio.ch.Util$2)
>> - locked <0x000659457db8> (a
>> java.util.Collections$UnmodifiableSet)
>> - locked <0x000659457108> (a sun.nio.ch.EPollSelectorImpl)
>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>> at
>> org.apache.kafka.common.network.Selector.select(Selector.java:425)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>> at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>> at
>> org.a

Re: Blocked in KafkaConsumer.commitOffsets

2016-06-15 Thread Robert Metzger
Hi,

I've looked at this issue already at the Flink list and recommended Hironori
to post here. It seems that the consumer is not returning from the poll()
call, that's why the commitOffsets() method can not enter the synchronized
block.
The KafkaConsumer is logging the following statements:

2016-06-10 20:29:53,677 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
2016-06-10 20:29:53,678 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.
2016-06-10 20:29:53,679 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.

2016-06-10 20:56:53,982 INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
the coordinator 2147482645 dead.


I guess that the poll() call is not returning within the given timeout
while trying to reconnect to the brokers?


On Wed, Jun 15, 2016 at 2:41 PM, Hironori Ogibayashi 
wrote:

> Hello,
>
> I am running stream processing job with Kafka and Flink.
> Flink reads records from Kafka.
>
> My software versions are:
> - Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
> - Kafka client library: 0.9.0.1
> - Flink: 1.0.3
>
> Now I have problem that Flink job is sometimes blocked and consumer lag
> is increasing.
> I got thread dump during the situation.
>
> This is the blocked thread. Looks like blocked in
> KafkaConsumer.commitOffsets.
>
> 
> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
> prio=10 tid=0x7f2b14010800 nid=0x1b89a waiting for monitor entry
> [0x7f2b3ddfc000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
> - waiting to lock <0x000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
> - locked <0x000659111cc8> (a java.lang.Object)
> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> ---
>
> And lock 0x000659111b58 is held by the following thread.
>
> ---
> "Thread-9" daemon prio=10 tid=0x7f2b2440d000 nid=0x1b838 runnable
> [0x7f2b3dbfa000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
> - locked <0x000659457dc8> (a sun.nio.ch.Util$2)
> - locked <0x000659457db8> (a
> java.util.Collections$UnmodifiableSet)
> - locked <0x000659457108> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
> at
> org.apache.kafka.common.network.Selector.select(Selector.java:425)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
> - locked <0x000659111b58> (a
> org.apache.kafka.clients.consumer.KafkaConsumer)
> ---
>
> I am wondering why Flink's kafka consumer is blocked and any advice
> would be appreciated.
>
> Thanks,
> Hironori Ogibayashi
>


Blocked in KafkaConsumer.commitOffsets

2016-06-15 Thread Hironori Ogibayashi
Hello,

I am running stream processing job with Kafka and Flink.
Flink reads records from Kafka.

My software versions are:
- Kafka broker: 0.9.0.2.4 (HDP 2.4.0.0 version)
- Kafka client library: 0.9.0.1
- Flink: 1.0.3

Now I have problem that Flink job is sometimes blocked and consumer lag
is increasing.
I got thread dump during the situation.

This is the blocked thread. Looks like blocked in KafkaConsumer.commitOffsets.


"Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
prio=10 tid=0x7f2b14010800 nid=0x1b89a waiting for monitor entry
[0x7f2b3ddfc000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
- waiting to lock <0x000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
- locked <0x000659111cc8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
---

And lock 0x000659111b58 is held by the following thread.

---
"Thread-9" daemon prio=10 tid=0x7f2b2440d000 nid=0x1b838 runnable
[0x7f2b3dbfa000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x000659457dc8> (a sun.nio.ch.Util$2)
- locked <0x000659457db8> (a java.util.Collections$UnmodifiableSet)
- locked <0x000659457108> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at org.apache.kafka.common.network.Selector.select(Selector.java:425)
at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
- locked <0x000659111b58> (a
org.apache.kafka.clients.consumer.KafkaConsumer)
---

I am wondering why Flink's kafka consumer is blocked and any advice
would be appreciated.

Thanks,
Hironori Ogibayashi