Kostas,

Thank you for your response.
Yes, I am using latest Flink, which is 1.0.3.

Thanks,
Hironori

2016-06-14 19:02 GMT+09:00 Kostas Kloudas <k.klou...@data-artisans.com>:
> Hello Hironori,
>
> Are you using the latest Flink version?
> There were some changes in the FlinkConsumer in the latest releases.
>
> Thanks,
> Kostas
>
>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <ogibaya...@gmail.com> 
>> wrote:
>>
>> Hello,
>>
>> I am running Flink job which reads topics from Kafka and write results
>> to Redis. I use FsStatebackend with HDFS.
>>
>> I noticed that taking checkpoint takes serveral minutes and sometimes 
>> expires.
>> ---
>> 2016-06-14 17:25:40,734 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Completed checkpoint 1456 (in 257956 ms)
>> 2016-06-14 17:25:40,735 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1457 @ 1465892740734
>> 2016-06-14 17:35:40,735 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1457 expired before completing.
>> 2016-06-14 17:35:40,736 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1458 @ 1465893340735
>> 2016-06-14 17:45:40,736 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1458 expired before completing.
>> 2016-06-14 17:45:40,737 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1459 @ 1465893940736
>> 2016-06-14 17:55:40,738 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Checkpoint 1459 expired before completing.
>> 2016-06-14 17:55:40,739 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering checkpoint 1460 @ 1465894540738
>> ---
>>
>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>> takes so long?
>>
>> I took jstack during checkpointing. It looks that checkpointing thread
>> is blocked in commitOffsets.
>>
>> ----
>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>> [0x00007f2b3ddfc000]
>>   java.lang.Thread.State: BLOCKED (on object monitor)
>>        at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392)
>>        - waiting to lock <0x0000000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>        at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169)
>>        at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179)
>>        at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596)
>>        - locked <0x0000000659111cc8> (a java.lang.Object)
>>        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945)
>>        at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>        at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>        at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>        at java.lang.Thread.run(Thread.java:745)
>> ---
>>
>> Blocker is this.
>>
>> ---
>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>> [0x00007f2b3dbfa000]
>>   java.lang.Thread.State: RUNNABLE
>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>>        - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2)
>>        - locked <0x0000000659457db8> (a 
>> java.util.Collections$UnmodifiableSet)
>>        - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl)
>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>>        at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>>        at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>>        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>>        at 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
>>        at 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
>>        at 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
>>        at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
>>        at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
>>        at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449)
>>        - locked <0x0000000659111b58> (a
>> org.apache.kafka.clients.consumer.KafkaConsumer)
>> ---
>>
>> If someone could advise me of the cause or the way to investigate
>> further, that would be appreciated.
>>
>> Thanks,
>> Hironori
>

Reply via email to