@CVP

Flink stores in checkpoints in your case only the Kafka offsets (few bytes)
and the custom state (e).

Here is an illustration of the checkpoint and what is stored (from the
Flink docs).
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html


I am quite puzzled why the offset committing problem occurs only for one
input, and not for the other.
I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
Could you try out a snapshot version to see if that fixes your problem?

Greetings,
Stephan



On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
chakravarth...@gmail.com> wrote:

> Hi Stefan,
>
>      Thanks a million for your detailed explanation. I appreciate it.
>
>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
> localhost (ubuntu 14.04)
>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>
>      With regards to Flink cluster there's only 1 JM & 2 TMs started with
> no HA. I presume this does not use zookeeper anyways as it runs as
> standalone cluster.
>
>
>      BTW., The kafka connector version that I use is as suggested in the
> flink connectors page
>
>
>
>
> *.       <dependency>              <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>         <version>1.1.1</version>        </dependency>*
>
>      Do you see any issues with versions?
>
>      1) Do you have benchmarks wrt., to checkpointing in flink?
>
>      2) There isn't detailed explanation on what states are stored as part
> of the checkpointing process. For ex.,  If I have pipeline like
> *source -> map -> keyBy -> map -> sink, my assumption on what's stored is:*
>
> *         a) The source stream's custom watermarked records*
>
> *         b) Intermediate states of each of the transformations in the
> pipeline*
>
> *         c) Delta of Records stored from the previous sink*
>
> *         d) Custom States (SayValueState as in my case) - Essentially
> this is what I bother about storing.*
> *         e) All of my operators*
>
>       Is my understanding right?
>
>      3) Is there a way in Flink to checkpoint only d) as stated above
>
>      4) Can you apply checkpointing to only streams and certain operators
> (say I wish to store aggregated values part of the transformation)
>
> Best Regards
> CVP
>
>
> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Thanks, the logs were very helpful!
>>
>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>> proper starting of checkpoints.
>>
>> Here is what is happening in detail:
>>
>>   - Between the point when the TaskManager receives the "trigger
>> checkpoint" message and when the point when the KafkaSource actually starts
>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>> (the other is fine).
>>   - The only way this delayed can be introduced is if another checkpoint
>> related operation (such as trigger() or notifyComplete() ) is still in
>> progress when the checkpoint is started. Flink does not perform concurrent
>> checkpoint operations on a single operator, to ease the concurrency model
>> for users.
>>   - The operation that is still in progress must be the committing of the
>> offsets (to ZooKeeper or Kafka). That also explains why this only happens
>> once one side receives the first record. Before that, there is nothing to
>> commit.
>>
>>
>> What Flink should fix:
>>   - The KafkaConsumer should run the commit operations asynchronously, to
>> not block the "notifyCheckpointComplete()" method.
>>
>> What you can fix:
>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>> well, the other does not. Do they go against different sets of brokers, or
>> different ZooKeepers? Is the metadata for one input bad?
>>   - In the next Flink version, you may opt-out of committing offsets to
>> Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
>> anyways.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Hi Stefan,
>>>
>>>     Please find my responses below.
>>>
>>>     - What source are you using for the slow input?
>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>> Streams*
>>>   - How large is the state that you are checkpointing?
>>>
>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as below.*
>>>
>>>
>>>
>>> *         final StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> streamEnv.setStateBackend(new
>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>> streamEnv.enableCheckpointing(10000);*
>>>
>>>
>>> *      In terms of the state stored, the KS1 stream has payload of 100K
>>> events/second, while KS2 have about 1 event / 10 minutes... basically the
>>> operators perform flatmaps on 8 fields of tuple (all fields are
>>> primitives). If you look at the states' sizes in dashboard they are in
>>> Kb...*
>>>   - Can you try to see in the log if actually the state snapshot takes
>>> that long, or if it simply takes long for the checkpoint barriers to
>>> travel through the stream due to a lot of backpressure?
>>>     [CVP] -There are no back pressure atleast from the sample
>>> computation in the flink dashboard. 100K/second is low load for flink's
>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>
>>>      I have attached the checkpoints times' as .png from the dashboard.
>>> Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the
>>> checkpoints take more than a minute in each case. Before these checkpoints,
>>> the KS2 stream did not have any events. As soon as an event(should be in
>>> bytes) was generated, the checkpoints went slow and subsequently a minute
>>> more for every checkpoint thereafter.
>>>
>>>    This log was collected from the standalone flink cluster with 1 job
>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>> (parallelism=1)
>>>
>>>     Please let me know if you need further info.,
>>>
>>>
>>>
>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> Let's try to figure that one out. Can you give us a bit more
>>>> information?
>>>>
>>>>   - What source are you using for the slow input?
>>>>   - How large is the state that you are checkpointing?
>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>> that long, or if it simply takes long for the checkpoint barriers to travel
>>>> through the stream due to a lot of backpressure?
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi CVP,
>>>>>
>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>> chakravarth...@gmail.com>:
>>>>>
>>>>>> Hi Aljoscha & Fabian,
>>>>>>
>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>
>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>
>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>
>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 
>>>>>> minutes
>>>>>> straight away.
>>>>>>
>>>>>>     The version of flink is 1.1.2.
>>>>>>
>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for 
>>>>>> many
>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>> frequently. I'm attaching the snapshot of the dashboard for your 
>>>>>> reference.
>>>>>>
>>>>>>      Is this an issue with flink checkpointing?
>>>>>>
>>>>>>  Best Regards
>>>>>> CVP
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to