Ah, right. To save historical snapshots, one could periodically read the whole compacted topic and save it somewhere.
Sent from my iPhone > On Feb 23, 2015, at 11:11 AM, Jay Kreps <[email protected]> wrote: > > Basically log compaction == snapshot in a logical format. You can optimize > a tiny bit more, of course, if you store the data files themselves for > whatever store but that is going to be very storage engine specific. > > -Jay > > On Mon, Feb 23, 2015 at 10:33 AM, Roger Hoover <[email protected]> > wrote: > >> Thanks, Julian. >> >> I didn't see any mention of checkpoints in Kappa or Liquid information I've >> read but it does seem like a very useful optimization to make re-processing >> and failure recovery much faster. Databus supports snapshots, I believe, >> so that DB replicates can be initialized in a practical amount of time. >> >> Interested to know of Jay, Chris, or others have thought about how >> snapshots might fit with Kafka +/or Samza. If it something Kafka should >> provide at some point or would it be layered on top? >> >> Cheers, >> >> Roger >> >> On Sun, Feb 22, 2015 at 1:13 AM, Julian Hyde <[email protected]> >> wrote: >> >>> Can I quibble with semantics? >>> >>> This problem seems to be more naturally a stream-to-stream join, not a >>> stream-to-table join. It seems unreasonable to expect the system to be >> able >>> to give you the state of a table at a given moment in the past, but it is >>> reasonable ask for the stream up to that point. >>> >>> A stream and the archive of a table (its contents at every moment in the >>> past) are equivalent in theory (they have exactly the same information >>> content) but different in practice: (1) there are different costs to >> access >>> them (it is costly to re-create a table by re-playing a stream of its >>> inserts), and (2) streams are managed internal to the system whereas >> tables >>> are external. For Roger's problem, (2) is a crucial difference. >>> >>> Then the question is how to throw information away but make it possible, >>> and efficient, to answer the queries we will need to ask in future. >>> >>> A good way to do this is with checkpoints, replay, and retention. You >>> periodically checkpoint the state of a table (or indeed any stateful >> stream >>> operator). To re-create the state of a operator at a particular time T >> you >>> start with the previous checkpoint and replay until T. How often to >>> checkpoint depends on the size of the operator's state relative to the >>> stream (tables have a lot of state, aggregate has less, and filter and >>> project have no state) and the length of its memory (there is little >> point >>> making a daily checkpoint for a 1 hour windowed aggregate because you can >>> restore state by starting with *any* checkpoint and replaying an hour of >>> data). >>> >>> Retention is a contract between the consumer and the up-stream operators. >>> If the consumer says to its source operator "I need you to be able to >>> replay any time-range from Feb 12th onwards", that operator either needs >> to >>> store its output back to Feb 12th, or it needs to retain the ability to >>> re-create that output. If the latter, then it tells *its* input(s) what >>> time-range they need to be able to re-play, say from Feb 11th. For rapid >>> play-back, it may choose to keep periodic checkpoints. >>> >>> If the final consumer loosens its retention requirements, to say 19th Feb >>> onwards, then each operator propagates the looser requirements to its >> input >>> operator(s), and this allows garbage to be collected. >>> >>> I don't know whether checkpoints and retention are spelled out in >>> Kappa/Liquid, but if not, they seem a natural and useful extension to the >>> theory. >>> >>> Julian >>> >>> >>>>> On Feb 21, 2015, at 4:51 PM, Roger Hoover <[email protected]> >>>> wrote: >>>> >>>> Thanks, Jay. This is one of the really nice advantages of local state >>> in my mind. Full retention would work but eventually run out of space, >>> right? Ideally, Kafka would guarantee to keep dirty keys for a >>> configurable amount of time as Chris suggested. >>>> >>>> Sent from my iPhone >>>> >>>>> On Feb 21, 2015, at 10:10 AM, Jay Kreps <[email protected]> wrote: >>>>> >>>>> Gotcha. Yes if you want to be able to join to past versions you >>> definitely >>>>> can't turn on compaction as the whole goal of that feature is to >> delete >>>>> past versions. But wouldn't it work to use full retention if you want >>> that >>>>> (and use the MessageChooser interface during reprocessing if you want >>> tight >>>>> control over the state recreation). I mean you have the same dilemma >> if >>> you >>>>> don't use local state but instead use a remote store--the remote store >>>>> likely only keeps the last version of each value so you can't join to >>> the >>>>> past. >>>>> >>>>> -Jay >>>>> >>>>> On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover <[email protected] >>> >>>>> wrote: >>>>> >>>>>> Jay, >>>>>> >>>>>> Sorry, I didn't explain it very well. I'm talking about a >> stream-table >>>>>> join where the table comes from a compacted topic that is used to >>> populate >>>>>> a local data store. As the stream events are processed, they are >>> joined >>>>>> with dimension data from the local store. >>>>>> >>>>>> If you want to kick off another version of this job that starts back >> in >>>>>> time, the new job cannot reliably recreate the same state of the >> local >>>>>> store that the original had because old values may have been >> compacted >>>>>> away. >>>>>> >>>>>> Does that make sense? >>>>>> >>>>>> Roger >>>>>> >>>>>>> On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps <[email protected]> >>> wrote: >>>>>>> >>>>>>> Hey Roger, >>>>>>> >>>>>>> I'm not sure if I understand the case you are describing. >>>>>>> >>>>>>> As Chris says we don't yet give you fined grained control over when >>>>>> history >>>>>>> starts to disappear (though we designed with the intention of making >>> that >>>>>>> configurable later). However I'm not sure if you need that for the >>> case >>>>>> you >>>>>>> describe. >>>>>>> >>>>>>> Say you have a job J that takes inputs I1...IN and produces output >>>>>> O1...ON >>>>>>> and in the process accumulates state in a topic S. I think the >>> approach >>>>>> is >>>>>>> to launch a J' (changed or improved in some way) that reprocesses >>> I1...IN >>>>>>> from the beginning of time (or some past point) into O1'...ON' and >>>>>>> accumulates state in S'. So the state for J and the state for J' are >>>>>>> totally independent. J' can't reuse J's state in general because the >>> code >>>>>>> that generates that state may have changed. >>>>>>> >>>>>>> -Jay >>>>>>> >>>>>>> On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover < >> [email protected] >>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Chris + Samza Devs, >>>>>>>> >>>>>>>> I was wondering whether Samza could support re-processing as >>> described >>>>>> by >>>>>>>> the Kappa architecture or Liquid ( >>>>>>>> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf). >>>>>>>> >>>>>>>> It seems that a changelog is not sufficient to be able to restore >>> state >>>>>>>> backward in time. Kafka compaction will guarantee that local state >>> can >>>>>>> be >>>>>>>> restored from where it left off but I don't see how it can restore >>> past >>>>>>>> state. >>>>>>>> >>>>>>>> Imagine the case where a stream job has a lot of state in it's >> local >>>>>>> store >>>>>>>> but it has not updated any keys in a long time. >>>>>>>> >>>>>>>> Time t1: All of the data would be in the tail of the Kafka log >> (past >>>>>> the >>>>>>>> cleaner point). >>>>>>>> Time t2: The job updates some keys. Now we're in a state where >> the >>>>>>> next >>>>>>>> compaction will blow away the old values for those keys. >>>>>>>> Time t3: Compaction occurs and old values are discarded. >>>>>>>> >>>>>>>> Say we want to launch a re-processing job that would begin from t1. >>> If >>>>>>> we >>>>>>>> launch that job before t3, it will correctly restore it's state. >>>>>>> However, >>>>>>>> if we launch the job after t3, it will be missing old values, >> right? >>>>>>>> >>>>>>>> Unless I'm misunderstanding something, the only way around this is >> to >>>>>>> keep >>>>>>>> snapshots in addition to the changelog. Has there been any >>> discussion >>>>>> of >>>>>>>> providing an option in Samza of taking RocksDB snapshots and >>> persisting >>>>>>>> them to an object store or HDFS? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Roger >>
