Basically log compaction == snapshot in a logical format. You can optimize a tiny bit more, of course, if you store the data files themselves for whatever store but that is going to be very storage engine specific.
-Jay On Mon, Feb 23, 2015 at 10:33 AM, Roger Hoover <roger.hoo...@gmail.com> wrote: > Thanks, Julian. > > I didn't see any mention of checkpoints in Kappa or Liquid information I've > read but it does seem like a very useful optimization to make re-processing > and failure recovery much faster. Databus supports snapshots, I believe, > so that DB replicates can be initialized in a practical amount of time. > > Interested to know of Jay, Chris, or others have thought about how > snapshots might fit with Kafka +/or Samza. If it something Kafka should > provide at some point or would it be layered on top? > > Cheers, > > Roger > > On Sun, Feb 22, 2015 at 1:13 AM, Julian Hyde <jul...@hydromatic.net> > wrote: > > > Can I quibble with semantics? > > > > This problem seems to be more naturally a stream-to-stream join, not a > > stream-to-table join. It seems unreasonable to expect the system to be > able > > to give you the state of a table at a given moment in the past, but it is > > reasonable ask for the stream up to that point. > > > > A stream and the archive of a table (its contents at every moment in the > > past) are equivalent in theory (they have exactly the same information > > content) but different in practice: (1) there are different costs to > access > > them (it is costly to re-create a table by re-playing a stream of its > > inserts), and (2) streams are managed internal to the system whereas > tables > > are external. For Roger's problem, (2) is a crucial difference. > > > > Then the question is how to throw information away but make it possible, > > and efficient, to answer the queries we will need to ask in future. > > > > A good way to do this is with checkpoints, replay, and retention. You > > periodically checkpoint the state of a table (or indeed any stateful > stream > > operator). To re-create the state of a operator at a particular time T > you > > start with the previous checkpoint and replay until T. How often to > > checkpoint depends on the size of the operator's state relative to the > > stream (tables have a lot of state, aggregate has less, and filter and > > project have no state) and the length of its memory (there is little > point > > making a daily checkpoint for a 1 hour windowed aggregate because you can > > restore state by starting with *any* checkpoint and replaying an hour of > > data). > > > > Retention is a contract between the consumer and the up-stream operators. > > If the consumer says to its source operator "I need you to be able to > > replay any time-range from Feb 12th onwards", that operator either needs > to > > store its output back to Feb 12th, or it needs to retain the ability to > > re-create that output. If the latter, then it tells *its* input(s) what > > time-range they need to be able to re-play, say from Feb 11th. For rapid > > play-back, it may choose to keep periodic checkpoints. > > > > If the final consumer loosens its retention requirements, to say 19th Feb > > onwards, then each operator propagates the looser requirements to its > input > > operator(s), and this allows garbage to be collected. > > > > I don't know whether checkpoints and retention are spelled out in > > Kappa/Liquid, but if not, they seem a natural and useful extension to the > > theory. > > > > Julian > > > > > > > On Feb 21, 2015, at 4:51 PM, Roger Hoover <roger.hoo...@gmail.com> > > wrote: > > > > > > Thanks, Jay. This is one of the really nice advantages of local state > > in my mind. Full retention would work but eventually run out of space, > > right? Ideally, Kafka would guarantee to keep dirty keys for a > > configurable amount of time as Chris suggested. > > > > > > Sent from my iPhone > > > > > >> On Feb 21, 2015, at 10:10 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > >> > > >> Gotcha. Yes if you want to be able to join to past versions you > > definitely > > >> can't turn on compaction as the whole goal of that feature is to > delete > > >> past versions. But wouldn't it work to use full retention if you want > > that > > >> (and use the MessageChooser interface during reprocessing if you want > > tight > > >> control over the state recreation). I mean you have the same dilemma > if > > you > > >> don't use local state but instead use a remote store--the remote store > > >> likely only keeps the last version of each value so you can't join to > > the > > >> past. > > >> > > >> -Jay > > >> > > >> On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover <roger.hoo...@gmail.com > > > > >> wrote: > > >> > > >>> Jay, > > >>> > > >>> Sorry, I didn't explain it very well. I'm talking about a > stream-table > > >>> join where the table comes from a compacted topic that is used to > > populate > > >>> a local data store. As the stream events are processed, they are > > joined > > >>> with dimension data from the local store. > > >>> > > >>> If you want to kick off another version of this job that starts back > in > > >>> time, the new job cannot reliably recreate the same state of the > local > > >>> store that the original had because old values may have been > compacted > > >>> away. > > >>> > > >>> Does that make sense? > > >>> > > >>> Roger > > >>> > > >>>> On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > >>>> > > >>>> Hey Roger, > > >>>> > > >>>> I'm not sure if I understand the case you are describing. > > >>>> > > >>>> As Chris says we don't yet give you fined grained control over when > > >>> history > > >>>> starts to disappear (though we designed with the intention of making > > that > > >>>> configurable later). However I'm not sure if you need that for the > > case > > >>> you > > >>>> describe. > > >>>> > > >>>> Say you have a job J that takes inputs I1...IN and produces output > > >>> O1...ON > > >>>> and in the process accumulates state in a topic S. I think the > > approach > > >>> is > > >>>> to launch a J' (changed or improved in some way) that reprocesses > > I1...IN > > >>>> from the beginning of time (or some past point) into O1'...ON' and > > >>>> accumulates state in S'. So the state for J and the state for J' are > > >>>> totally independent. J' can't reuse J's state in general because the > > code > > >>>> that generates that state may have changed. > > >>>> > > >>>> -Jay > > >>>> > > >>>> On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover < > roger.hoo...@gmail.com > > > > > >>>> wrote: > > >>>> > > >>>>> Chris + Samza Devs, > > >>>>> > > >>>>> I was wondering whether Samza could support re-processing as > > described > > >>> by > > >>>>> the Kappa architecture or Liquid ( > > >>>>> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf). > > >>>>> > > >>>>> It seems that a changelog is not sufficient to be able to restore > > state > > >>>>> backward in time. Kafka compaction will guarantee that local state > > can > > >>>> be > > >>>>> restored from where it left off but I don't see how it can restore > > past > > >>>>> state. > > >>>>> > > >>>>> Imagine the case where a stream job has a lot of state in it's > local > > >>>> store > > >>>>> but it has not updated any keys in a long time. > > >>>>> > > >>>>> Time t1: All of the data would be in the tail of the Kafka log > (past > > >>> the > > >>>>> cleaner point). > > >>>>> Time t2: The job updates some keys. Now we're in a state where > the > > >>>> next > > >>>>> compaction will blow away the old values for those keys. > > >>>>> Time t3: Compaction occurs and old values are discarded. > > >>>>> > > >>>>> Say we want to launch a re-processing job that would begin from t1. > > If > > >>>> we > > >>>>> launch that job before t3, it will correctly restore it's state. > > >>>> However, > > >>>>> if we launch the job after t3, it will be missing old values, > right? > > >>>>> > > >>>>> Unless I'm misunderstanding something, the only way around this is > to > > >>>> keep > > >>>>> snapshots in addition to the changelog. Has there been any > > discussion > > >>> of > > >>>>> providing an option in Samza of taking RocksDB snapshots and > > persisting > > >>>>> them to an object store or HDFS? > > >>>>> > > >>>>> Thanks, > > >>>>> > > >>>>> Roger > > >>> > > > > >