Hi Stephan,

    Is the Async kafka offset commit released in 1.3.1?

Varaga

On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
chakravarth...@gmail.com> wrote:

> Hi Stephan,
>
>      That should be great. Let me know once the fix is done and the
> snapshot version to use, I'll check and revert then.
>      Can you also share the JIRA that tracks the issue?
>
>      With regards to offset commit issue, I'm not sure as to how to
> proceed here. Probably I'll use your fix first and see if the problem
> reoccurs.
>
> Thanks much
> Varaga
>
> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> @CVP
>>
>> Flink stores in checkpoints in your case only the Kafka offsets (few
>> bytes) and the custom state (e).
>>
>> Here is an illustration of the checkpoint and what is stored (from the
>> Flink docs).
>> https://ci.apache.org/projects/flink/flink-docs-master/
>> internals/stream_checkpointing.html
>>
>>
>> I am quite puzzled why the offset committing problem occurs only for one
>> input, and not for the other.
>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
>> Could you try out a snapshot version to see if that fixes your problem?
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Hi Stefan,
>>>
>>>      Thanks a million for your detailed explanation. I appreciate it.
>>>
>>>      -  The *zookeeper bundled with kafka 0.9.0.1* was used to start
>>> zookeeper. There is only 1 instance (standalone) of zookeeper running on my
>>> localhost (ubuntu 14.04)
>>>      -  There is only 1 Kafka broker (*version: 0.9.0.1* )
>>>
>>>      With regards to Flink cluster there's only 1 JM & 2 TMs started
>>> with no HA. I presume this does not use zookeeper anyways as it runs as
>>> standalone cluster.
>>>
>>>
>>>      BTW., The kafka connector version that I use is as suggested in the
>>> flink connectors page
>>>
>>>
>>>
>>>
>>> *.       <dependency>              <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>         <version>1.1.1</version>        </dependency>*
>>>
>>>      Do you see any issues with versions?
>>>
>>>      1) Do you have benchmarks wrt., to checkpointing in flink?
>>>
>>>      2) There isn't detailed explanation on what states are stored as
>>> part of the checkpointing process. For ex.,  If I have pipeline like
>>> *source -> map -> keyBy -> map -> sink, my assumption on what's stored
>>> is:*
>>>
>>> *         a) The source stream's custom watermarked records*
>>>
>>> *         b) Intermediate states of each of the transformations in the
>>> pipeline*
>>>
>>> *         c) Delta of Records stored from the previous sink*
>>>
>>> *         d) Custom States (SayValueState as in my case) - Essentially
>>> this is what I bother about storing.*
>>> *         e) All of my operators*
>>>
>>>       Is my understanding right?
>>>
>>>      3) Is there a way in Flink to checkpoint only d) as stated above
>>>
>>>      4) Can you apply checkpointing to only streams and certain
>>> operators (say I wish to store aggregated values part of the transformation)
>>>
>>> Best Regards
>>> CVP
>>>
>>>
>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Thanks, the logs were very helpful!
>>>>
>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevents
>>>> proper starting of checkpoints.
>>>>
>>>> Here is what is happening in detail:
>>>>
>>>>   - Between the point when the TaskManager receives the "trigger
>>>> checkpoint" message and when the point when the KafkaSource actually starts
>>>> the checkpoint is a long time (many seconds) - for one of the Kafka Inputs
>>>> (the other is fine).
>>>>   - The only way this delayed can be introduced is if another
>>>> checkpoint related operation (such as trigger() or notifyComplete() ) is
>>>> still in progress when the checkpoint is started. Flink does not perform
>>>> concurrent checkpoint operations on a single operator, to ease the
>>>> concurrency model for users.
>>>>   - The operation that is still in progress must be the committing of
>>>> the offsets (to ZooKeeper or Kafka). That also explains why this only
>>>> happens once one side receives the first record. Before that, there is
>>>> nothing to commit.
>>>>
>>>>
>>>> What Flink should fix:
>>>>   - The KafkaConsumer should run the commit operations asynchronously,
>>>> to not block the "notifyCheckpointComplete()" method.
>>>>
>>>> What you can fix:
>>>>   - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works
>>>> well, the other does not. Do they go against different sets of brokers, or
>>>> different ZooKeepers? Is the metadata for one input bad?
>>>>   - In the next Flink version, you may opt-out of committing offsets to
>>>> Kafka/ZooKeeper all together. It is not important for Flink's checkpoints
>>>> anyways.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <
>>>> chakravarth...@gmail.com> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>>     Please find my responses below.
>>>>>
>>>>>     - What source are you using for the slow input?
>>>>> *     [CVP] - Both stream as pointed out in my first mail, are Kafka
>>>>> Streams*
>>>>>   - How large is the state that you are checkpointing?
>>>>>
>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as
>>>>> below.*
>>>>>
>>>>>
>>>>>
>>>>> *         final StreamExecutionEnvironment streamEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> streamEnv.setStateBackend(new
>>>>> FsStateBackend("file:///tmp/flink/checkpoints"));
>>>>> streamEnv.enableCheckpointing(10000);*
>>>>>
>>>>>
>>>>> *      In terms of the state stored, the KS1 stream has payload of
>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... basically
>>>>> the operators perform flatmaps on 8 fields of tuple (all fields are
>>>>> primitives). If you look at the states' sizes in dashboard they are in
>>>>> Kb...*
>>>>>   - Can you try to see in the log if actually the state snapshot takes
>>>>> that long, or if it simply takes long for the checkpoint barriers to
>>>>> travel through the stream due to a lot of backpressure?
>>>>>     [CVP] -There are no back pressure atleast from the sample
>>>>> computation in the flink dashboard. 100K/second is low load for flink's
>>>>> benchmarks. I could not quite get the barriers vs snapshot state. I have
>>>>> attached the Task Manager log (DEBUG) info if that will interest you.
>>>>>
>>>>>      I have attached the checkpoints times' as .png from the
>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd
>>>>> see that the checkpoints take more than a minute in each case. Before 
>>>>> these
>>>>> checkpoints, the KS2 stream did not have any events. As soon as an
>>>>> event(should be in bytes) was generated, the checkpoints went slow and
>>>>> subsequently a minute more for every checkpoint thereafter.
>>>>>
>>>>>    This log was collected from the standalone flink cluster with 1 job
>>>>> manager & 2 TMs. 1 TM was running this application with checkpointing
>>>>> (parallelism=1)
>>>>>
>>>>>     Please let me know if you need further info.,
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> Let's try to figure that one out. Can you give us a bit more
>>>>>> information?
>>>>>>
>>>>>>   - What source are you using for the slow input?
>>>>>>   - How large is the state that you are checkpointing?
>>>>>>   - Can you try to see in the log if actually the state snapshot
>>>>>> takes that long, or if it simply takes long for the checkpoint barriers 
>>>>>> to
>>>>>> travel through the stream due to a lot of backpressure?
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi CVP,
>>>>>>>
>>>>>>> I'm not so much familiar with the internals of the checkpointing
>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on here.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <
>>>>>>> chakravarth...@gmail.com>:
>>>>>>>
>>>>>>>> Hi Aljoscha & Fabian,
>>>>>>>>
>>>>>>>>     I have a stream application that has 2 stream source as below.
>>>>>>>>
>>>>>>>>      KeyedStream<String, String> *ks1* = ds1.keyBy("*") ;
>>>>>>>>      KeyedStream<Tuple2<String, V>, String> *ks2* =
>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0);
>>>>>>>>
>>>>>>>>      ks1.connect(ks2).flatMap(X);
>>>>>>>>      //X is a CoFlatMapFunction that inserts and removes elements
>>>>>>>> from ks2 into a key-value state member. Elements from ks1 are matched
>>>>>>>> against that state. the CoFlatMapFunction operator maintains
>>>>>>>> ValueState<Tuple2<Long, Long>>;
>>>>>>>>
>>>>>>>>      //ks1 is streaming about 100K events/sec from kafka topic
>>>>>>>>      //ks2 is streaming about 1 event every 10 minutes... Precisely
>>>>>>>> when the 1st event is consumed from this stream, checkpoint takes 2 
>>>>>>>> minutes
>>>>>>>> straight away.
>>>>>>>>
>>>>>>>>     The version of flink is 1.1.2.
>>>>>>>>
>>>>>>>> I tried to use checkpoint every 10 Secs using a FsStateBackend...
>>>>>>>> What I notice is that the checkpoint duration is almost 2 minutes for 
>>>>>>>> many
>>>>>>>> cases, while for the other cases it varies from 100 ms to 1.5 minutes
>>>>>>>> frequently. I'm attaching the snapshot of the dashboard for your 
>>>>>>>> reference.
>>>>>>>>
>>>>>>>>      Is this an issue with flink checkpointing?
>>>>>>>>
>>>>>>>>  Best Regards
>>>>>>>> CVP
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to