The plan to release 1.1.3 is asap ;-)

Waiting for last backported patched to get in, then release testing and

If you want to test it today, you would need to manually build the
release-1.1 branch.


On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <> wrote:

> Hi Gordon,
>      Do I need to clone and build release-1.1 branch to test this?
>      I currently use flinlk 1.1.2 runtime. When is the plan to release it
> in 1.1.3?
> Best Regards
> Varaga
> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <>
> wrote:
>> Hi,
>> Helping out here: this is the PR for async Kafka offset committing -
>> It has already been merged into the master and release-1.1 branches, so
>> you can try out the changes now if you’d like.
>> The change should also be included in the 1.1.3 release, which the Flink
>> community is discussing to release soon.
>> Will definitely be helpful if you can provide feedback afterwards!
>> Best Regards,
>> Gordon
>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>> wrote:
>> Hi Stephan,
>>     Is the Async kafka offset commit released in 1.3.1?
>> Varaga
>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>> wrote:
>>> Hi Stephan,
>>>      That should be great. Let me know once the fix is done and the
>>> snapshot version to use, I'll check and revert then.
>>>      Can you also share the JIRA that tracks the issue?
>>>      With regards to offset commit issue, I'm not sure as to how to
>>> proceed here. Probably I'll use your fix first and see if the problem
>>> reoccurs.
>>> Thanks much
>>> Varaga
>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <> wrote:
>>>> @CVP
>>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>>> bytes) and the custom state (e).
>>>> Here is an illustration of the checkpoint and what is stored (from the
>>>> Flink docs).
>>>> nals/stream_checkpointing.html
>>>> I am quite puzzled why the offset committing problem occurs only for
>>>> one input, and not for the other.
>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>>> Could you try out a snapshot version to see if that fixes your problem?
>>>> Greetings,
>>>> Stephan
>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>>>> wrote:
>>>>> Hi Stefan,
>>>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>>>      -  The *zookeeper bundled with kafka* was used to start
>>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on 
>>>>> my
>>>>> localhost (ubuntu 14.04)
>>>>>      -  There is only 1 Kafka broker (*version:* )
>>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>>>> standalone cluster.
>>>>>      BTW., The kafka connector version that I use is as suggested in
>>>>> the flink connectors page
>>>>> *.        <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>>> <version>1.1.1</version>         </dependency>*
>>>>>      Do you see any issues with versions?
>>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>>      2) There isn't detailed explanation on what states are stored as
>>>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>>>>> is:*
>>>>> *         a) The source stream's custom watermarked records*
>>>>> *         b) Intermediate states of each of the transformations in the
>>>>> pipeline*
>>>>> *         c) Delta of Records stored from the previous sink*
>>>>> *         d) Custom States (SayValueState as in my case) - Essentially
>>>>> this is what I bother about storing.*
>>>>> *         e) All of my operators*
>>>>>       Is my understanding right?
>>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>>      4) Can you apply checkpointing to only streams and certain
>>>>> operators (say I wish to store aggregated values part of the 
>>>>> transformation)
>>>>> Best Regards
>>>>> CVP
>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <>
>>>>> wrote:
>>>>>> Thanks, the logs were very helpful!
>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>>>> proper starting of checkpoints.
>>>>>> Here is what is happening in detail:
>>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>>> checkpoint" message and when the point when the KafkaSource actually 
>>>>>> starts
>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka 
>>>>>> Inputs
>>>>>> (the other is fine).
>>>>>>   - The only way this delayed can be introduced is if another
>>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>>> concurrency model for users.
>>>>>>   - The operation that is still in progress must be the committing of
>>>>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>>>> happens once one side receives the first record. Before that, there is
>>>>>> nothing to commit.
>>>>>> What Flink should fix:
>>>>>>   - The KafkaConsumer should run the commit operations
>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" method.
>>>>>> What you can fix:
>>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>>>> well, the other does not. Do they go against different sets of brokers, 
>>>>>> or
>>>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>>>   - In the next Flink version, you may opt-out of committing offsets
>>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>>> checkpoints anyways.
>>>>>> Greetings,
>>>>>> Stephan
>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>>>> wrote:
>>>>>>> Hi Stefan,
>>>>>>>     Please find my responses below.
>>>>>>>     - What source are you using for the slow input?
>>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>>>> Streams*
>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>>> below.*
>>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> streamEnv.setStateBackend(new
>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... 
>>>>>>> basically
>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>>> Kb... *
>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>>> see that the checkpoints take more than a minute in each case. Before 
>>>>>>> these
>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>>> job manager & 2 TMs. 1 TM was running this application with 
>>>>>>> checkpointing
>>>>>>> (parallelism=1)
>>>>>>>     Please let me know if you need further info.,
>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <>
>>>>>>> wrote:
>>>>>>>> Hi!
>>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>>> information?
>>>>>>>>   - What source are you using for the slow input?
>>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>>> takes that long, or if it simply takes long for the checkpoint 
>>>>>>>> barriers to
>>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>> Greetings,
>>>>>>>> Stephan
>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <>
>>>>>>>> wrote:
>>>>>>>>> Hi CVP,
>>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>> Best, Fabian
>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>>> Precisely when the 1st event is consumed from this stream, 
>>>>>>>>>> checkpoint takes
>>>>>>>>>> 2 minutes straight away.
>>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes 
>>>>>>>>>> for many
>>>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your 
>>>>>>>>>> reference.
>>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>>  Best Regards
>>>>>>>>>> CVP

Reply via email to