Hi Gordon,

     Do I need to clone and build release-1.1 branch to test this?
     I currently use flinlk 1.1.2 runtime. When is the plan to release it
in 1.1.3?

Best Regards
Varaga

On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> Helping out here: this is the PR for async Kafka offset committing -
> https://github.com/apache/flink/pull/2574.
> It has already been merged into the master and release-1.1 branches, so
> you can try out the changes now if you’d like.
> The change should also be included in the 1.1.3 release, which the Flink
> community is discussing to release soon.
>
> Will definitely be helpful if you can provide feedback afterwards!
>
> Best Regards,
> Gordon
>
>
> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
> chakravarth...@gmail.com) wrote:
>
> Hi Stephan,
>
>     Is the Async kafka offset commit released in 1.3.1?
>
> Varaga
>
> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi Stephan,
>>
>>      That should be great. Let me know once the fix is done and the
>> snapshot version to use, I'll check and revert then.
>>      Can you also share the JIRA that tracks the issue?
>>
>>      With regards to offset commit issue, I'm not sure as to how to
>> proceed here. Probably I'll use your fix first and see if the problem
>> reoccurs.
>>
>> Thanks much
>> Varaga
>>
>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> @CVP
>>>
>>> Flink stores in checkpoints in your case only the Kafka offsets (few
>>> bytes) and the custom state (e).
>>>
>>> Here is an illustration of the checkpoint and what is stored (from the
>>> Flink docs).
>>> https://ci.apache.org/projects/flink/flink-docs-master/inter
>>> nals/stream_checkpointing.html
>>>
>>>
>>> I am quite puzzled why the offset committing problem occurs only for one
>>> input, and not for the other.
>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>>> Could you try out a snapshot version to see if that fixes your problem?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
>>>> Hi Stefan,
>>>>
>>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>>
>>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>>>> localhost (ubuntu 14.04)
>>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>>
>>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>>> standalone cluster.
>>>>
>>>>
>>>>      BTW., The kafka connector version that I use is as suggested in
>>>> the flink connectors page
>>>>
>>>>
>>>>
>>>>
>>>> *.        <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>> <version>1.1.1</version>         </dependency>*
>>>>
>>>>      Do you see any issues with versions?
>>>>
>>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>>
>>>>      2) There isn't detailed explanation on what states are stored as
>>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>>>> is:*
>>>>
>>>> *         a) The source stream's custom watermarked records*
>>>>
>>>> *         b) Intermediate states of each of the transformations in the
>>>> pipeline*
>>>>
>>>> *         c) Delta of Records stored from the previous sink*
>>>>
>>>> *         d) Custom States (SayValueState as in my case) - Essentially
>>>> this is what I bother about storing.*
>>>> *         e) All of my operators*
>>>>
>>>>       Is my understanding right?
>>>>
>>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>>
>>>>      4) Can you apply checkpointing to only streams and certain
>>>> operators (say I wish to store aggregated values part of the 
>>>> transformation)
>>>>
>>>> Best Regards
>>>> CVP
>>>>
>>>>
>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Thanks, the logs were very helpful!
>>>>>
>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>>> proper starting of checkpoints.
>>>>>
>>>>> Here is what is happening in detail:
>>>>>
>>>>>   - Between the point when the TaskManager receives the "trigger
>>>>> checkpoint" message and when the point when the KafkaSource actually 
>>>>> starts
>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>>> (the other is fine).
>>>>>   - The only way this delayed can be introduced is if another
>>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>>> still in progress when the checkpoint is started. Flink does not perform
>>>>> concurrent checkpoint operations on a single operator, to ease the
>>>>> concurrency model for users.
>>>>>   - The operation that is still in progress must be the committing of
>>>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>>> happens once one side receives the first record. Before that, there is
>>>>> nothing to commit.
>>>>>
>>>>>
>>>>> What Flink should fix:
>>>>>   - The KafkaConsumer should run the commit operations asynchronously,
>>>>> to not block the "notifyCheckpointComplete()" method.
>>>>>
>>>>> What you can fix:
>>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>>> well, the other does not. Do they go against different sets of brokers, or
>>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>>   - In the next Flink version, you may opt-out of committing offsets
>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's
>>>>> checkpoints anyways.
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>>> chakravarth...@gmail.com> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>>     Please find my responses below.
>>>>>>
>>>>>>     - What source are you using for the slow input?
>>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>>> Streams*
>>>>>>   - How large is the state that you are checkpointing?
>>>>>>
>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>>> below.*
>>>>>>
>>>>>>
>>>>>>
>>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> streamEnv.setStateBackend(new
>>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>>> streamEnv.enableCheckpointing(10000); *
>>>>>>
>>>>>>
>>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... 
>>>>>> basically
>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>>> Kb... *
>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint
>>>>>> barriers to travel through the stream due to a lot of backpressure?
>>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>>
>>>>>>      I have attached the checkpoints times' as .png from the
>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>>> see that the checkpoints take more than a minute in each case. Before 
>>>>>> these
>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>>
>>>>>>    This log was collected from the standalone flink cluster with 1
>>>>>> job manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>>> (parallelism=1)
>>>>>>
>>>>>>     Please let me know if you need further info.,
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>>> information?
>>>>>>>
>>>>>>>   - What source are you using for the slow input?
>>>>>>>   - How large is the state that you are checkpointing?
>>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>>> takes that long, or if it simply takes long for the checkpoint barriers 
>>>>>>> to
>>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi CVP,
>>>>>>>>
>>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>>> chakravarth...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>>
>>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>>
>>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>>
>>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>>
>>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes...
>>>>>>>>> Precisely when the 1st event is consumed from this stream, checkpoint 
>>>>>>>>> takes
>>>>>>>>> 2 minutes straight away.
>>>>>>>>>
>>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>>
>>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for 
>>>>>>>>> many
>>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your 
>>>>>>>>> reference.
>>>>>>>>>
>>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>>
>>>>>>>>>  Best Regards
>>>>>>>>> CVP
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to