I can't see why users would care to turn it off. 

Eno
> On 10 Feb 2017, at 10:29, Damian Guy <damian....@gmail.com> wrote:
> 
> Hi Eno,
> 
> Sounds good to me. The only reason i can think of is if we want to be able
> to turn it off.
> Gouzhang - thoughts?
> 
> On Fri, 10 Feb 2017 at 10:28 Eno Thereska <eno.there...@gmail.com> wrote:
> 
>> Question: if checkpointing is so cheap why not do it every commit
>> interval? That way we can get rid of this extra config variable and just
>> use the existing commit interval.
>> 
>> Less tuning knobs.
>> 
>> Eno
>> 
>>> On 10 Feb 2017, at 09:27, Damian Guy <damian....@gmail.com> wrote:
>>> 
>>> Gouzhang,
>>> 
>>> You've confused me. The failure scenarios you have described are the same
>>> as they are today. With the checkpoint files in place less data will be
>>> replayed, so there will be fewer duplicates.
>>> 
>>> Are you saying you'd like the option to turn checkpointing off?
>>> 
>>> Thanks,
>>> Damian
>>> 
>>> On Thu, 9 Feb 2017 at 21:55 Guozhang Wang <wangg...@gmail.com> wrote:
>>> 
>>>> Eno,
>>>> 
>>>> You are right, it is not a new scenario.
>>>> 
>>>> Thinking a bit more on how we could incorporate KIP-98 in Streams, I
>> feel
>>>> that if EOS is turned on inside Streams, then we probably cannot always
>>>> resume from the checkpointed offsets as it is not guaranteed to be
>>>> "consistent"; but since EOS may not be turned on by default this is
>> still
>>>> worthwhile to add this feature I guess.
>>>> 
>>>> About the default config values: I think the default value of 5 min is
>> OK
>>>> to me, since restoration is usually faster than normal processing
>> (unless
>>>> your traffic was really high), about allowing it to be "turned off"
>> with a
>>>> non-positive value: I feel there are still values to keep this door
>> open as
>>>> in the future if EOS is turned on, people may just want to turn off
>>>> checkpointing anyways, or there maybe other scenarios that we have not
>>>> realized yet. On the other hand, I would argue that it is less likely
>> users
>>>> mistakenly set it to a non-positive value.
>>>> 
>>>> Guozhang
>>>> 
>>>> On Thu, Feb 9, 2017 at 1:03 PM, Eno Thereska <eno.there...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Guozhang,
>>>>> 
>>>>> It seems to me we have the same semantics today. Are you saying there
>> is
>>>> a
>>>>> new failure scenario?
>>>>> 
>>>>> Thanks,
>>>>> Eno
>>>>> 
>>>>>> On 9 Feb 2017, at 19:42, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>> 
>>>>>> More specifically, here is my reasoning of failure cases, and would
>>>> like
>>>>> to
>>>>>> get your feedbacks:
>>>>>> 
>>>>>> *StreamTask*
>>>>>> 
>>>>>> For stream-task, the committing order is 1) flush state (may send more
>>>>>> records to changelog in producer), 2) flush producer, 3) commit
>>>> upstream
>>>>>> offsets. My understanding is that the writing of the checkpoint file
>>>> will
>>>>>> between 2) and 3). So thatt he new order will be 1) flush state, 2)
>>>> flush
>>>>>> producer, 3) write checkpoint file (when necessary), 4) commit
>> upstream
>>>>>> offsets.
>>>>>> 
>>>>>> And we have a bunch of "changelog offsets" regarding the state: a)
>>>> offset
>>>>>> corresponding to the image of the persistent file, name it point A, b)
>>>>> log
>>>>>> end offset, name it offset B, c) checkpoint file recorded offset, name
>>>> it
>>>>>> offset C, d) offset corresponding to the current committed upstream
>>>>> offset,
>>>>>> name it offset D.
>>>>>> 
>>>>>> Now let's talk about the failure cases:
>>>>>> 
>>>>>> If there is a crash between 1) and 2), then A > B = C = D. In this
>>>> case,
>>>>> if
>>>>>> we restore, we will replay no logs at all since B = C while the
>>>>> persistent
>>>>>> state file is actually "ahead of time", and we will start reprocessing
>>>>>> since from the input offset corresponding to D = B < A and hence have
>>>>> some
>>>>>> duplicated, *which will be incorrect* if the update logic involve
>>>> reading
>>>>>> the state store values as well (i.e. not a blind write), e.g.
>>>>> aggregations.
>>>>>> 
>>>>>> If there is a crash between 2) and 3), then A = B > C = D. When we
>>>>> restore,
>>>>>> we will replay from C -> B = A, and then start reprocessing from input
>>>>>> offset corresponding to D < A, and same issue applies as above.
>>>>>> 
>>>>>> If there is a crash between 3) and 4), then A = B = C > D. When we
>>>>> restore,
>>>>>> we will not replay, and then start reprocessing from input offset
>>>>>> corresponding to D < A, and same issue applies as above.
>>>>>> 
>>>>>> 
>>>>>> *StandbyTask*
>>>>>> 
>>>>>> We only do one operation today, which is 1) flush state, I think we
>>>> will
>>>>>> add the writing of the checkpoint file after it as step 2).
>>>>>> 
>>>>>> Failure cases again: offset A -> correspond to the image of the file,
>>>>>> offset B -> changelog end offset, offset C -> written as in the
>>>>> checkpoint
>>>>>> file.
>>>>>> 
>>>>>> If there is a crash between 1) and 2), then B >= A > C (B >= A because
>>>> we
>>>>>> are reading from changelog topic so A will never be greater than B),
>>>>>> 
>>>>>> 1) and if this task resumes as a standby task, we will resume
>>>> restoration
>>>>>> from offset C, and a few duplicates from C -> A will be applied again
>>>> to
>>>>>> local state files, then continue from A -> B, *this is OK* since they
>>>> do
>>>>>> not incur any computations hence no side effects and are all
>>>> idempotent.
>>>>>> 
>>>>>> 2) and if this task resumes as a stream task, we will replay
>> changelogs
>>>>>> from C -> A, with duplicated updates, and then from A -> B. This is
>>>> also
>>>>> OK
>>>>>> for the same reason as above.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> So it seems to me that this is not safe for a StreamTask, or maybe the
>>>>>> writing of the checkpoint file in your mind is different?
>>>>>> 
>>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Feb 9, 2017 at 11:02 AM, Guozhang Wang <wangg...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> A quick question re: `We will add the above config parameter to
>>>>>>> *StreamsConfig*. During *StreamTask#commit()*,
>> *StandbyTask#commit()*,
>>>>>>> and *GlobalUpdateStateTask#flushState()* we will check if the
>>>>> checkpoint
>>>>>>> interval has elapsed and write the checkpoint file.`
>>>>>>> 
>>>>>>> Will the writing of the checkpoint file happen before the flushing of
>>>>> the
>>>>>>> state manager?
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Feb 9, 2017 at 10:48 AM, Matthias J. Sax <
>>>> matth...@confluent.io
>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> But 5 min means, that we (in the worst case) need to reply data from
>>>>> the
>>>>>>>> last 5 minutes to get the store ready.
>>>>>>>> 
>>>>>>>> So why not go with the min possible value of 30 seconds to speed up
>>>>> this
>>>>>>>> process if the impact is negligible anyway?
>>>>>>>> 
>>>>>>>> What do you gain by being conservative?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> On 2/9/17 2:54 AM, Damian Guy wrote:
>>>>>>>>> Why shouldn't it be 5 minutes? ;-)
>>>>>>>>> It is a finger in the air number. Based on the testing i did it
>>>> shows
>>>>>>>> that
>>>>>>>>> there isn't much, if any, overhead when checkpointing a single
>> store
>>>>> on
>>>>>>>> the
>>>>>>>>> commit interval. The default commit interval is 30 seconds, so it
>>>>> could
>>>>>>>>> possibly be set to that. However, i'd prefer to be a little
>>>>>>>> conservative so
>>>>>>>>> 5 minutes seemed reasonable.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Thu, 9 Feb 2017 at 10:25 Michael Noll <mich...@confluent.io>
>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Damian,
>>>>>>>>>> 
>>>>>>>>>> could you elaborate briefly why the default value should be 5
>>>>> minutes?
>>>>>>>>>> What are the considerations, assumptions, etc. that go into
>> picking
>>>>>>>> this
>>>>>>>>>> value?
>>>>>>>>>> 
>>>>>>>>>> Right now, in the KIP and in this discussion, "5 mins" looks like
>> a
>>>>>>>> magic
>>>>>>>>>> number to me. :-)
>>>>>>>>>> 
>>>>>>>>>> -Michael
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Feb 9, 2017 at 11:03 AM, Damian Guy <damian....@gmail.com
>>> 
>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> I've ran the SimpleBenchmark with checkpoint on and off to see
>>>> what
>>>>>>>> the
>>>>>>>>>>> impact is. It appears that there is very little impact, if any.
>>>> The
>>>>>>>>>> numbers
>>>>>>>>>>> with checkpointing on actually look better, but that is likely
>>>>> largely
>>>>>>>>>> due
>>>>>>>>>>> to external influences.
>>>>>>>>>>> 
>>>>>>>>>>> In any case, i'm going to suggest we go with a default checkpoint
>>>>>>>>>> interval
>>>>>>>>>>> of 5 minutes. I've update the KIP with this.
>>>>>>>>>>> 
>>>>>>>>>>> commit every 10 seconds (no checkpoint)
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/34798/287372.83751939767/29.570664980746017
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/35942/278226.0308274442/28.62945857214401
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/34677/288375.58035585546/29.673847218617528
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/34677/288375.58035585546/29.673847218617528
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/31192/320595.02436522185/32.98922800718133
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> checkpoint every 10 seconds (same as commit interval)
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/36997/270292.185852907/27.81306592426413
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/32087/311652.69423754164/32.069062237043035
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/32895/303997.5680194558/31.281349749202004
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/33476/298721.4720994145/30.738439479029754
>>>>>>>>>>> Streams Performance [records/latency/rec-sec/MB-sec
>> source+store]:
>>>>>>>>>>> 10000000/33196/301241.1133871551/30.99771056753826
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, 8 Feb 2017 at 09:02 Damian Guy <damian....@gmail.com>
>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Matthias,
>>>>>>>>>>>> 
>>>>>>>>>>>> Fair point. I'll update it the KIP.
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> 
>>>>>>>>>>>> On Wed, 8 Feb 2017 at 05:49 Matthias J. Sax <
>>>> matth...@confluent.io
>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Damian,
>>>>>>>>>>>> 
>>>>>>>>>>>> I am not strict about it either. However, if there is no
>>>> advantage
>>>>> in
>>>>>>>>>>>> disabling it, we might not want to allow it. This would have the
>>>>>>>>>>>> advantage to guard users to accidentally switch it off.
>>>>>>>>>>>> 
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On 2/3/17 2:03 AM, Damian Guy wrote:
>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It possibly doesn't make sense to disable it, but then i'm sure
>>>>>>>>>> someone
>>>>>>>>>>>>> will come up with a reason they don't want it!
>>>>>>>>>>>>> I'm happy to change it such that the checkpoint interval must
>>>> be >
>>>>>>>> 0.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Damian
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, 3 Feb 2017 at 01:29 Matthias J. Sax <
>>>>> matth...@confluent.io>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks Damian.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> One more question: "Checkpointing is disabled if the
>> checkpoint
>>>>>>>>>>> interval
>>>>>>>>>>>>>> is set to a value <=0."
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Does it make sense to disable check pointing? What's the
>>>> tradeoff
>>>>>>>>>>> here?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 2/2/17 1:51 AM, Damian Guy wrote:
>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for the comments.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1. TBD - i need to do some performance tests and try and work
>>>>> out
>>>>>>>> a
>>>>>>>>>>>>>>> sensible default.
>>>>>>>>>>>>>>> 2. Yes, you are correct. It could be a multiple of the
>>>>>>>>>>>>>> commit.interval.ms.
>>>>>>>>>>>>>>> But, that would also mean if you change the commit interval -
>>>>> say
>>>>>>>>>> you
>>>>>>>>>>>>>> lower
>>>>>>>>>>>>>>> it, then you might also need to change the checkpoint setting
>>>>>>>> (i.e,
>>>>>>>>>>> you
>>>>>>>>>>>>>>> still only want to checkpoint every n minutes).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax <
>>>>>>>> matth...@confluent.io
>>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for the KIP Damian.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I am wondering about two things:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 1. what should be the default value for the new parameter?
>>>>>>>>>>>>>>>> 2. why is the new parameter provided in ms?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> About (2): because
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> "the minimum checkpoint interval will be the value of
>>>>>>>>>>>>>>>> commit.interval.ms. In effect the actual checkpoint
>> interval
>>>>>>>> will
>>>>>>>>>>> be
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> multiple of the commit interval"
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> it might be easier to just use an parameter that is
>>>>>>>>>>> "number-or-commit
>>>>>>>>>>>>>>>> intervals".
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 2/1/17 7:29 AM, Damian Guy wrote:
>>>>>>>>>>>>>>>>> Thanks for the comments Eno.
>>>>>>>>>>>>>>>>> As for exactly once, i don't believe this matters as we are
>>>>> just
>>>>>>>>>>>>>>>> restoring
>>>>>>>>>>>>>>>>> the change-log, i.e, the result of the aggregations that
>>>>>>>>>> previously
>>>>>>>>>>>> ran
>>>>>>>>>>>>>>>>> etc. So once initialized the state store will be in the
>> same
>>>>>>>>>> state
>>>>>>>>>>> as
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> was before.
>>>>>>>>>>>>>>>>> Having the checkpoint in a kafka topic is not ideal as the
>>>>> state
>>>>>>>>>> is
>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>> kafka streams instance. So each instance would need to
>> start
>>>>>>>>>> with a
>>>>>>>>>>>>>>>> unique
>>>>>>>>>>>>>>>>> id that is persistent.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Wed, 1 Feb 2017 at 13:20 Eno Thereska <
>>>>>>>> eno.there...@gmail.com
>>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> As a follow up to my previous comment, have you thought
>>>> about
>>>>>>>>>>>> writing
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> checkpoint to a topic instead of a local file? That would
>>>>> have
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> advantage that all metadata continues to be managed by
>>>> Kafka,
>>>>>>>> as
>>>>>>>>>>>> well
>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>> fit with EoS. The potential disadvantage would be a slower
>>>>>>>>>>> latency,
>>>>>>>>>>>>>>>> however
>>>>>>>>>>>>>>>>>> if it is periodic as you mention, I'm not sure that would
>>>> be
>>>>> a
>>>>>>>>>>> show
>>>>>>>>>>>>>>>> stopper.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>> On 1 Feb 2017, at 12:58, Eno Thereska <
>>>>> eno.there...@gmail.com
>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks Damian, this is a good idea and will reduce the
>>>>> restore
>>>>>>>>>>>> time.
>>>>>>>>>>>>>>>>>> Looking forward, with exactly once and support for
>>>>> transactions
>>>>>>>>>> in
>>>>>>>>>>>>>>>> Kafka, I
>>>>>>>>>>>>>>>>>> believe we'll have to add some support for rolling back
>>>>>>>>>>> checkpoints,
>>>>>>>>>>>>>>>> e.g.,
>>>>>>>>>>>>>>>>>> when a transaction is aborted. We need to be aware of that
>>>>> and
>>>>>>>>>>>> ideally
>>>>>>>>>>>>>>>>>> anticipate a bit those needs in the KIP.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On 1 Feb 2017, at 10:18, Damian Guy <
>>>> damian....@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I would like to start the discussion on KIP-116:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>> 116+-+Add+State+Store+Checkpoint+Interval+Configuration
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> --
>>>> -- Guozhang
>>>> 
>> 
>> 

Reply via email to