Ah, right, thanks.

Sent from my iPhone

> On Feb 23, 2015, at 11:15 AM, Felix GV <fville...@linkedin.com.INVALID> wrote:
> 
> A recently-compacted topic is pretty similar to a snapshot in terms of 
> semantics, no? If you read the topic up until the point where the compaction 
> ended, you effectively read every key just once, same as a snapshot.
> 
> I agree that the guaranteed uncompacted/dirty retention period would be 
> useful.
> 
> --
> 
> Felix GV
> Data Infrastructure Engineer
> Distributed Data Systems
> LinkedIn
> 
> f...@linkedin.com
> linkedin.com/in/felixgv
> 
> ________________________________________
> From: Roger Hoover [roger.hoo...@gmail.com]
> Sent: Monday, February 23, 2015 10:33 AM
> To: dev@samza.apache.org
> Subject: Re: Re-processing a la Kappa/Liquid
> 
> Thanks, Julian.
> 
> I didn't see any mention of checkpoints in Kappa or Liquid information I've
> read but it does seem like a very useful optimization to make re-processing
> and failure recovery much faster.  Databus supports snapshots, I believe,
> so that DB replicates can be initialized in a practical amount of time.
> 
> Interested to know of Jay, Chris, or others have thought about how
> snapshots might fit with Kafka +/or Samza.  If it something Kafka should
> provide at some point or would it be layered on top?
> 
> Cheers,
> 
> Roger
> 
>> On Sun, Feb 22, 2015 at 1:13 AM, Julian Hyde <jul...@hydromatic.net> wrote:
>> 
>> Can I quibble with semantics?
>> 
>> This problem seems to be more naturally a stream-to-stream join, not a
>> stream-to-table join. It seems unreasonable to expect the system to be able
>> to give you the state of a table at a given moment in the past, but it is
>> reasonable ask for the stream up to that point.
>> 
>> A stream and the archive of a table (its contents at every moment in the
>> past) are equivalent in theory (they have exactly the same information
>> content) but different in practice: (1) there are different costs to access
>> them (it is costly to re-create a table by re-playing a stream of its
>> inserts), and (2) streams are managed internal to the system whereas tables
>> are external. For Roger's problem, (2) is a crucial difference.
>> 
>> Then the question is how to throw information away but make it possible,
>> and efficient, to answer the queries we will need to ask in future.
>> 
>> A good way to do this is with checkpoints, replay, and retention. You
>> periodically checkpoint the state of a table (or indeed any stateful stream
>> operator). To re-create the state of a operator at a particular time T you
>> start with the previous checkpoint and replay until T. How often to
>> checkpoint depends on the size of the operator's state relative to the
>> stream (tables have a lot of state, aggregate has less, and filter and
>> project have no state) and the length of its memory (there is little point
>> making a daily checkpoint for a 1 hour windowed aggregate because you can
>> restore state by starting with *any* checkpoint and replaying an hour of
>> data).
>> 
>> Retention is a contract between the consumer and the up-stream operators.
>> If the consumer says to its source operator "I need you to be able to
>> replay any time-range from Feb 12th onwards", that operator either needs to
>> store its output back to Feb 12th, or it needs to retain the ability to
>> re-create that output. If the latter, then it tells *its* input(s) what
>> time-range they need to be able to re-play, say from Feb 11th. For rapid
>> play-back, it may choose to keep periodic checkpoints.
>> 
>> If the final consumer loosens its retention requirements, to say 19th Feb
>> onwards, then each operator propagates the looser requirements to its input
>> operator(s), and this allows garbage to be collected.
>> 
>> I don't know whether checkpoints and retention are spelled out in
>> Kappa/Liquid, but if not, they seem a natural and useful extension to the
>> theory.
>> 
>> Julian
>> 
>> 
>>>> On Feb 21, 2015, at 4:51 PM, Roger Hoover <roger.hoo...@gmail.com>
>>> wrote:
>>> 
>>> Thanks, Jay.  This is one of the really nice advantages of local state
>> in my mind.  Full retention would work but eventually run out of space,
>> right?  Ideally, Kafka would guarantee to keep dirty keys for a
>> configurable amount of time as Chris suggested.
>>> 
>>> Sent from my iPhone
>>> 
>>>> On Feb 21, 2015, at 10:10 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>>> 
>>>> Gotcha. Yes if you want to be able to join to past versions you
>> definitely
>>>> can't turn on compaction as the whole goal of that feature is to delete
>>>> past versions. But wouldn't it work to use full retention if you want
>> that
>>>> (and use the MessageChooser interface during reprocessing if you want
>> tight
>>>> control over the state recreation). I mean you have the same dilemma if
>> you
>>>> don't use local state but instead use a remote store--the remote store
>>>> likely only keeps the last version of each value so you can't join to
>> the
>>>> past.
>>>> 
>>>> -Jay
>>>> 
>>>> On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover <roger.hoo...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Jay,
>>>>> 
>>>>> Sorry, I didn't explain it very well.  I'm talking about a stream-table
>>>>> join where the table comes from a compacted topic that is used to
>> populate
>>>>> a local data store.  As the stream events are processed, they are
>> joined
>>>>> with dimension data from the local store.
>>>>> 
>>>>> If you want to kick off another version of this job that starts back in
>>>>> time, the new job cannot reliably recreate the same state of the local
>>>>> store that the original had because old values may have been compacted
>>>>> away.
>>>>> 
>>>>> Does that make sense?
>>>>> 
>>>>> Roger
>>>>> 
>>>>>> On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps <jay.kr...@gmail.com>
>> wrote:
>>>>>> 
>>>>>> Hey Roger,
>>>>>> 
>>>>>> I'm not sure if I understand the case you are describing.
>>>>>> 
>>>>>> As Chris says we don't yet give you fined grained control over when
>>>>> history
>>>>>> starts to disappear (though we designed with the intention of making
>> that
>>>>>> configurable later). However I'm not sure if you need that for the
>> case
>>>>> you
>>>>>> describe.
>>>>>> 
>>>>>> Say you have a job J that takes inputs I1...IN and produces output
>>>>> O1...ON
>>>>>> and in the process accumulates state in a topic S. I think the
>> approach
>>>>> is
>>>>>> to launch a J' (changed or improved in some way) that reprocesses
>> I1...IN
>>>>>> from the beginning of time (or some past point) into O1'...ON' and
>>>>>> accumulates state in S'. So the state for J and the state for J' are
>>>>>> totally independent. J' can't reuse J's state in general because the
>> code
>>>>>> that generates that state may have changed.
>>>>>> 
>>>>>> -Jay
>>>>>> 
>>>>>> On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover <roger.hoo...@gmail.com
>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Chris + Samza Devs,
>>>>>>> 
>>>>>>> I was wondering whether Samza could support re-processing as
>> described
>>>>> by
>>>>>>> the Kappa architecture or Liquid (
>>>>>>> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf).
>>>>>>> 
>>>>>>> It seems that a changelog is not sufficient to be able to restore
>> state
>>>>>>> backward in time.  Kafka compaction will guarantee that local state
>> can
>>>>>> be
>>>>>>> restored from where it left off but I don't see how it can restore
>> past
>>>>>>> state.
>>>>>>> 
>>>>>>> Imagine the case where a stream job has a lot of state in it's local
>>>>>> store
>>>>>>> but it has not updated any keys in a long time.
>>>>>>> 
>>>>>>> Time t1: All of the data would be in the tail of the Kafka log (past
>>>>> the
>>>>>>> cleaner point).
>>>>>>> Time t2:  The job updates some keys.   Now we're in a state where the
>>>>>> next
>>>>>>> compaction will blow away the old values for those keys.
>>>>>>> Time t3:  Compaction occurs and old values are discarded.
>>>>>>> 
>>>>>>> Say we want to launch a re-processing job that would begin from t1.
>> If
>>>>>> we
>>>>>>> launch that job before t3, it will correctly restore it's state.
>>>>>> However,
>>>>>>> if we launch the job after t3, it will be missing old values, right?
>>>>>>> 
>>>>>>> Unless I'm misunderstanding something, the only way around this is to
>>>>>> keep
>>>>>>> snapshots in addition to the changelog.  Has there been any
>> discussion
>>>>> of
>>>>>>> providing an option in Samza of taking RocksDB snapshots and
>> persisting
>>>>>>> them to an object store or HDFS?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Roger
>> 
>> 

Reply via email to