Ah, right, thanks. Sent from my iPhone
> On Feb 23, 2015, at 11:15 AM, Felix GV <fville...@linkedin.com.INVALID> wrote: > > A recently-compacted topic is pretty similar to a snapshot in terms of > semantics, no? If you read the topic up until the point where the compaction > ended, you effectively read every key just once, same as a snapshot. > > I agree that the guaranteed uncompacted/dirty retention period would be > useful. > > -- > > Felix GV > Data Infrastructure Engineer > Distributed Data Systems > LinkedIn > > f...@linkedin.com > linkedin.com/in/felixgv > > ________________________________________ > From: Roger Hoover [roger.hoo...@gmail.com] > Sent: Monday, February 23, 2015 10:33 AM > To: dev@samza.apache.org > Subject: Re: Re-processing a la Kappa/Liquid > > Thanks, Julian. > > I didn't see any mention of checkpoints in Kappa or Liquid information I've > read but it does seem like a very useful optimization to make re-processing > and failure recovery much faster. Databus supports snapshots, I believe, > so that DB replicates can be initialized in a practical amount of time. > > Interested to know of Jay, Chris, or others have thought about how > snapshots might fit with Kafka +/or Samza. If it something Kafka should > provide at some point or would it be layered on top? > > Cheers, > > Roger > >> On Sun, Feb 22, 2015 at 1:13 AM, Julian Hyde <jul...@hydromatic.net> wrote: >> >> Can I quibble with semantics? >> >> This problem seems to be more naturally a stream-to-stream join, not a >> stream-to-table join. It seems unreasonable to expect the system to be able >> to give you the state of a table at a given moment in the past, but it is >> reasonable ask for the stream up to that point. >> >> A stream and the archive of a table (its contents at every moment in the >> past) are equivalent in theory (they have exactly the same information >> content) but different in practice: (1) there are different costs to access >> them (it is costly to re-create a table by re-playing a stream of its >> inserts), and (2) streams are managed internal to the system whereas tables >> are external. For Roger's problem, (2) is a crucial difference. >> >> Then the question is how to throw information away but make it possible, >> and efficient, to answer the queries we will need to ask in future. >> >> A good way to do this is with checkpoints, replay, and retention. You >> periodically checkpoint the state of a table (or indeed any stateful stream >> operator). To re-create the state of a operator at a particular time T you >> start with the previous checkpoint and replay until T. How often to >> checkpoint depends on the size of the operator's state relative to the >> stream (tables have a lot of state, aggregate has less, and filter and >> project have no state) and the length of its memory (there is little point >> making a daily checkpoint for a 1 hour windowed aggregate because you can >> restore state by starting with *any* checkpoint and replaying an hour of >> data). >> >> Retention is a contract between the consumer and the up-stream operators. >> If the consumer says to its source operator "I need you to be able to >> replay any time-range from Feb 12th onwards", that operator either needs to >> store its output back to Feb 12th, or it needs to retain the ability to >> re-create that output. If the latter, then it tells *its* input(s) what >> time-range they need to be able to re-play, say from Feb 11th. For rapid >> play-back, it may choose to keep periodic checkpoints. >> >> If the final consumer loosens its retention requirements, to say 19th Feb >> onwards, then each operator propagates the looser requirements to its input >> operator(s), and this allows garbage to be collected. >> >> I don't know whether checkpoints and retention are spelled out in >> Kappa/Liquid, but if not, they seem a natural and useful extension to the >> theory. >> >> Julian >> >> >>>> On Feb 21, 2015, at 4:51 PM, Roger Hoover <roger.hoo...@gmail.com> >>> wrote: >>> >>> Thanks, Jay. This is one of the really nice advantages of local state >> in my mind. Full retention would work but eventually run out of space, >> right? Ideally, Kafka would guarantee to keep dirty keys for a >> configurable amount of time as Chris suggested. >>> >>> Sent from my iPhone >>> >>>> On Feb 21, 2015, at 10:10 AM, Jay Kreps <jay.kr...@gmail.com> wrote: >>>> >>>> Gotcha. Yes if you want to be able to join to past versions you >> definitely >>>> can't turn on compaction as the whole goal of that feature is to delete >>>> past versions. But wouldn't it work to use full retention if you want >> that >>>> (and use the MessageChooser interface during reprocessing if you want >> tight >>>> control over the state recreation). I mean you have the same dilemma if >> you >>>> don't use local state but instead use a remote store--the remote store >>>> likely only keeps the last version of each value so you can't join to >> the >>>> past. >>>> >>>> -Jay >>>> >>>> On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover <roger.hoo...@gmail.com> >>>> wrote: >>>> >>>>> Jay, >>>>> >>>>> Sorry, I didn't explain it very well. I'm talking about a stream-table >>>>> join where the table comes from a compacted topic that is used to >> populate >>>>> a local data store. As the stream events are processed, they are >> joined >>>>> with dimension data from the local store. >>>>> >>>>> If you want to kick off another version of this job that starts back in >>>>> time, the new job cannot reliably recreate the same state of the local >>>>> store that the original had because old values may have been compacted >>>>> away. >>>>> >>>>> Does that make sense? >>>>> >>>>> Roger >>>>> >>>>>> On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps <jay.kr...@gmail.com> >> wrote: >>>>>> >>>>>> Hey Roger, >>>>>> >>>>>> I'm not sure if I understand the case you are describing. >>>>>> >>>>>> As Chris says we don't yet give you fined grained control over when >>>>> history >>>>>> starts to disappear (though we designed with the intention of making >> that >>>>>> configurable later). However I'm not sure if you need that for the >> case >>>>> you >>>>>> describe. >>>>>> >>>>>> Say you have a job J that takes inputs I1...IN and produces output >>>>> O1...ON >>>>>> and in the process accumulates state in a topic S. I think the >> approach >>>>> is >>>>>> to launch a J' (changed or improved in some way) that reprocesses >> I1...IN >>>>>> from the beginning of time (or some past point) into O1'...ON' and >>>>>> accumulates state in S'. So the state for J and the state for J' are >>>>>> totally independent. J' can't reuse J's state in general because the >> code >>>>>> that generates that state may have changed. >>>>>> >>>>>> -Jay >>>>>> >>>>>> On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover <roger.hoo...@gmail.com >>> >>>>>> wrote: >>>>>> >>>>>>> Chris + Samza Devs, >>>>>>> >>>>>>> I was wondering whether Samza could support re-processing as >> described >>>>> by >>>>>>> the Kappa architecture or Liquid ( >>>>>>> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf). >>>>>>> >>>>>>> It seems that a changelog is not sufficient to be able to restore >> state >>>>>>> backward in time. Kafka compaction will guarantee that local state >> can >>>>>> be >>>>>>> restored from where it left off but I don't see how it can restore >> past >>>>>>> state. >>>>>>> >>>>>>> Imagine the case where a stream job has a lot of state in it's local >>>>>> store >>>>>>> but it has not updated any keys in a long time. >>>>>>> >>>>>>> Time t1: All of the data would be in the tail of the Kafka log (past >>>>> the >>>>>>> cleaner point). >>>>>>> Time t2: The job updates some keys. Now we're in a state where the >>>>>> next >>>>>>> compaction will blow away the old values for those keys. >>>>>>> Time t3: Compaction occurs and old values are discarded. >>>>>>> >>>>>>> Say we want to launch a re-processing job that would begin from t1. >> If >>>>>> we >>>>>>> launch that job before t3, it will correctly restore it's state. >>>>>> However, >>>>>>> if we launch the job after t3, it will be missing old values, right? >>>>>>> >>>>>>> Unless I'm misunderstanding something, the only way around this is to >>>>>> keep >>>>>>> snapshots in addition to the changelog. Has there been any >> discussion >>>>> of >>>>>>> providing an option in Samza of taking RocksDB snapshots and >> persisting >>>>>>> them to an object store or HDFS? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Roger >> >>