Ah, right.  To save historical snapshots, one could periodically read the whole 
compacted topic and save it somewhere.

Sent from my iPhone

> On Feb 23, 2015, at 11:11 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> 
> Basically log compaction == snapshot in a logical format. You can optimize
> a tiny bit more, of course, if you store the data files themselves for
> whatever store but that is going to be very storage engine specific.
> 
> -Jay
> 
> On Mon, Feb 23, 2015 at 10:33 AM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
> 
>> Thanks, Julian.
>> 
>> I didn't see any mention of checkpoints in Kappa or Liquid information I've
>> read but it does seem like a very useful optimization to make re-processing
>> and failure recovery much faster.  Databus supports snapshots, I believe,
>> so that DB replicates can be initialized in a practical amount of time.
>> 
>> Interested to know of Jay, Chris, or others have thought about how
>> snapshots might fit with Kafka +/or Samza.  If it something Kafka should
>> provide at some point or would it be layered on top?
>> 
>> Cheers,
>> 
>> Roger
>> 
>> On Sun, Feb 22, 2015 at 1:13 AM, Julian Hyde <jul...@hydromatic.net>
>> wrote:
>> 
>>> Can I quibble with semantics?
>>> 
>>> This problem seems to be more naturally a stream-to-stream join, not a
>>> stream-to-table join. It seems unreasonable to expect the system to be
>> able
>>> to give you the state of a table at a given moment in the past, but it is
>>> reasonable ask for the stream up to that point.
>>> 
>>> A stream and the archive of a table (its contents at every moment in the
>>> past) are equivalent in theory (they have exactly the same information
>>> content) but different in practice: (1) there are different costs to
>> access
>>> them (it is costly to re-create a table by re-playing a stream of its
>>> inserts), and (2) streams are managed internal to the system whereas
>> tables
>>> are external. For Roger's problem, (2) is a crucial difference.
>>> 
>>> Then the question is how to throw information away but make it possible,
>>> and efficient, to answer the queries we will need to ask in future.
>>> 
>>> A good way to do this is with checkpoints, replay, and retention. You
>>> periodically checkpoint the state of a table (or indeed any stateful
>> stream
>>> operator). To re-create the state of a operator at a particular time T
>> you
>>> start with the previous checkpoint and replay until T. How often to
>>> checkpoint depends on the size of the operator's state relative to the
>>> stream (tables have a lot of state, aggregate has less, and filter and
>>> project have no state) and the length of its memory (there is little
>> point
>>> making a daily checkpoint for a 1 hour windowed aggregate because you can
>>> restore state by starting with *any* checkpoint and replaying an hour of
>>> data).
>>> 
>>> Retention is a contract between the consumer and the up-stream operators.
>>> If the consumer says to its source operator "I need you to be able to
>>> replay any time-range from Feb 12th onwards", that operator either needs
>> to
>>> store its output back to Feb 12th, or it needs to retain the ability to
>>> re-create that output. If the latter, then it tells *its* input(s) what
>>> time-range they need to be able to re-play, say from Feb 11th. For rapid
>>> play-back, it may choose to keep periodic checkpoints.
>>> 
>>> If the final consumer loosens its retention requirements, to say 19th Feb
>>> onwards, then each operator propagates the looser requirements to its
>> input
>>> operator(s), and this allows garbage to be collected.
>>> 
>>> I don't know whether checkpoints and retention are spelled out in
>>> Kappa/Liquid, but if not, they seem a natural and useful extension to the
>>> theory.
>>> 
>>> Julian
>>> 
>>> 
>>>>> On Feb 21, 2015, at 4:51 PM, Roger Hoover <roger.hoo...@gmail.com>
>>>> wrote:
>>>> 
>>>> Thanks, Jay.  This is one of the really nice advantages of local state
>>> in my mind.  Full retention would work but eventually run out of space,
>>> right?  Ideally, Kafka would guarantee to keep dirty keys for a
>>> configurable amount of time as Chris suggested.
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>>> On Feb 21, 2015, at 10:10 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>>>> 
>>>>> Gotcha. Yes if you want to be able to join to past versions you
>>> definitely
>>>>> can't turn on compaction as the whole goal of that feature is to
>> delete
>>>>> past versions. But wouldn't it work to use full retention if you want
>>> that
>>>>> (and use the MessageChooser interface during reprocessing if you want
>>> tight
>>>>> control over the state recreation). I mean you have the same dilemma
>> if
>>> you
>>>>> don't use local state but instead use a remote store--the remote store
>>>>> likely only keeps the last version of each value so you can't join to
>>> the
>>>>> past.
>>>>> 
>>>>> -Jay
>>>>> 
>>>>> On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover <roger.hoo...@gmail.com
>>> 
>>>>> wrote:
>>>>> 
>>>>>> Jay,
>>>>>> 
>>>>>> Sorry, I didn't explain it very well.  I'm talking about a
>> stream-table
>>>>>> join where the table comes from a compacted topic that is used to
>>> populate
>>>>>> a local data store.  As the stream events are processed, they are
>>> joined
>>>>>> with dimension data from the local store.
>>>>>> 
>>>>>> If you want to kick off another version of this job that starts back
>> in
>>>>>> time, the new job cannot reliably recreate the same state of the
>> local
>>>>>> store that the original had because old values may have been
>> compacted
>>>>>> away.
>>>>>> 
>>>>>> Does that make sense?
>>>>>> 
>>>>>> Roger
>>>>>> 
>>>>>>> On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps <jay.kr...@gmail.com>
>>> wrote:
>>>>>>> 
>>>>>>> Hey Roger,
>>>>>>> 
>>>>>>> I'm not sure if I understand the case you are describing.
>>>>>>> 
>>>>>>> As Chris says we don't yet give you fined grained control over when
>>>>>> history
>>>>>>> starts to disappear (though we designed with the intention of making
>>> that
>>>>>>> configurable later). However I'm not sure if you need that for the
>>> case
>>>>>> you
>>>>>>> describe.
>>>>>>> 
>>>>>>> Say you have a job J that takes inputs I1...IN and produces output
>>>>>> O1...ON
>>>>>>> and in the process accumulates state in a topic S. I think the
>>> approach
>>>>>> is
>>>>>>> to launch a J' (changed or improved in some way) that reprocesses
>>> I1...IN
>>>>>>> from the beginning of time (or some past point) into O1'...ON' and
>>>>>>> accumulates state in S'. So the state for J and the state for J' are
>>>>>>> totally independent. J' can't reuse J's state in general because the
>>> code
>>>>>>> that generates that state may have changed.
>>>>>>> 
>>>>>>> -Jay
>>>>>>> 
>>>>>>> On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover <
>> roger.hoo...@gmail.com
>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Chris + Samza Devs,
>>>>>>>> 
>>>>>>>> I was wondering whether Samza could support re-processing as
>>> described
>>>>>> by
>>>>>>>> the Kappa architecture or Liquid (
>>>>>>>> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf).
>>>>>>>> 
>>>>>>>> It seems that a changelog is not sufficient to be able to restore
>>> state
>>>>>>>> backward in time.  Kafka compaction will guarantee that local state
>>> can
>>>>>>> be
>>>>>>>> restored from where it left off but I don't see how it can restore
>>> past
>>>>>>>> state.
>>>>>>>> 
>>>>>>>> Imagine the case where a stream job has a lot of state in it's
>> local
>>>>>>> store
>>>>>>>> but it has not updated any keys in a long time.
>>>>>>>> 
>>>>>>>> Time t1: All of the data would be in the tail of the Kafka log
>> (past
>>>>>> the
>>>>>>>> cleaner point).
>>>>>>>> Time t2:  The job updates some keys.   Now we're in a state where
>> the
>>>>>>> next
>>>>>>>> compaction will blow away the old values for those keys.
>>>>>>>> Time t3:  Compaction occurs and old values are discarded.
>>>>>>>> 
>>>>>>>> Say we want to launch a re-processing job that would begin from t1.
>>> If
>>>>>>> we
>>>>>>>> launch that job before t3, it will correctly restore it's state.
>>>>>>> However,
>>>>>>>> if we launch the job after t3, it will be missing old values,
>> right?
>>>>>>>> 
>>>>>>>> Unless I'm misunderstanding something, the only way around this is
>> to
>>>>>>> keep
>>>>>>>> snapshots in addition to the changelog.  Has there been any
>>> discussion
>>>>>> of
>>>>>>>> providing an option in Samza of taking RocksDB snapshots and
>>> persisting
>>>>>>>> them to an object store or HDFS?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Roger
>> 

Reply via email to