Kostas,

I have attached a log file from one of the taskManager. (The same host
I executed jstack)
I noticed that there are lots of "Marking the coordinator 2147482645
dead" message in the log.
MyContinuousProcessingTimeTriggerGlobal in the log is my custom
trigger which is based on
ContinuousProcessingTimeTrigger but clean up windows when it received
specific log records.

Thanks,
Hironori

2016-06-14 21:23 GMT+09:00 Kostas Kloudas <k.klou...@data-artisans.com>:
> Hi Hironori,
>
> Could you also provide the logs of the taskManager?
>
> As you described, it seems that the consumer is stuck in the polling loop, 
> although Flink polls with
> a timeout. This would normally mean that periodically it should release the 
> lock for the checkpoints to go through.
>
> The logs of the task manager can help at clarifying why this does not happen.
>
> Thanks,
> Kostas
>
>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <ogibaya...@gmail.com> 
>> wrote:
>>
>> Kostas,
>>
>> Thank you for your response.
>> Yes, I am using latest Flink, which is 1.0.3.
>>
>> Thanks,
>> Hironori
>>
>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <k.klou...@data-artisans.com>:
>>> Hello Hironori,
>>>
>>> Are you using the latest Flink version?
>>> There were some changes in the FlinkConsumer in the latest releases.
>>>
>>> Thanks,
>>> Kostas
>>>
>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <ogibaya...@gmail.com> 
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I am running Flink job which reads topics from Kafka and write results
>>>> to Redis. I use FsStatebackend with HDFS.
>>>>
>>>> I noticed that taking checkpoint takes serveral minutes and sometimes 
>>>> expires.
>>>> ---
>>>> 2016-06-14 17:25:40,734 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Completed checkpoint 1456 (in 257956 ms)
>>>> 2016-06-14 17:25:40,735 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1457 @ 1465892740734
>>>> 2016-06-14 17:35:40,735 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1457 expired before completing.
>>>> 2016-06-14 17:35:40,736 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1458 @ 1465893340735
>>>> 2016-06-14 17:45:40,736 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1458 expired before completing.
>>>> 2016-06-14 17:45:40,737 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1459 @ 1465893940736
>>>> 2016-06-14 17:55:40,738 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1459 expired before completing.
>>>> 2016-06-14 17:55:40,739 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1460 @ 1465894540738
>>>> ---
>>>>
>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>>> takes so long?
>>>>
>>>> I took jstack during checkpointing. It looks that checkpointing thread
>>>> is blocked in commitOffsets.
>>>>
>>>> ----
>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>>> [0x00007f2b3ddfc000]
>>>>  java.lang.Thread.State: BLOCKED (on object monitor)
>>>>       at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>>>       - waiting to lock <0x0000000659111b58> (a
>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>       at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>>>       at 
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>>>       at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>>>       - locked <0x0000000659111cc8> (a java.lang.Object)
>>>>       at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>>>       at 
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>>>       at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>       at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>       at java.lang.Thread.run(Thread.java:745)
>>>> ---
>>>>
>>>> Blocker is this.
>>>>
>>>> ---
>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>>> [0x00007f2b3dbfa000]
>>>>  java.lang.Thread.State: RUNNABLE
>>>>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>>>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>>>       - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>>>       - locked <0x0000000659457db8> (a 
>>>> java.util.Collections$UnmodifiableSet)
>>>>       - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>>>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>>>       at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>>>       at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>>>       at 
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>>>       at 
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>>>       at 
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>>>       at 
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>>>       at 
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>>>       at 
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>>>       at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>>>       - locked <0x0000000659111b58> (a
>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>> ---
>>>>
>>>> If someone could advise me of the cause or the way to investigate
>>>> further, that would be appreciated.
>>>>
>>>> Thanks,
>>>> Hironori
>>>
>

Attachment: flink-flink-taskmanager-0-FLINK1503.log.gz
Description: GNU Zip compressed data

Reply via email to