Basically log compaction == snapshot in a logical format. You can optimize
a tiny bit more, of course, if you store the data files themselves for
whatever store but that is going to be very storage engine specific.

-Jay

On Mon, Feb 23, 2015 at 10:33 AM, Roger Hoover <roger.hoo...@gmail.com>
wrote:

> Thanks, Julian.
>
> I didn't see any mention of checkpoints in Kappa or Liquid information I've
> read but it does seem like a very useful optimization to make re-processing
> and failure recovery much faster.  Databus supports snapshots, I believe,
> so that DB replicates can be initialized in a practical amount of time.
>
> Interested to know of Jay, Chris, or others have thought about how
> snapshots might fit with Kafka +/or Samza.  If it something Kafka should
> provide at some point or would it be layered on top?
>
> Cheers,
>
> Roger
>
> On Sun, Feb 22, 2015 at 1:13 AM, Julian Hyde <jul...@hydromatic.net>
> wrote:
>
> > Can I quibble with semantics?
> >
> > This problem seems to be more naturally a stream-to-stream join, not a
> > stream-to-table join. It seems unreasonable to expect the system to be
> able
> > to give you the state of a table at a given moment in the past, but it is
> > reasonable ask for the stream up to that point.
> >
> > A stream and the archive of a table (its contents at every moment in the
> > past) are equivalent in theory (they have exactly the same information
> > content) but different in practice: (1) there are different costs to
> access
> > them (it is costly to re-create a table by re-playing a stream of its
> > inserts), and (2) streams are managed internal to the system whereas
> tables
> > are external. For Roger's problem, (2) is a crucial difference.
> >
> > Then the question is how to throw information away but make it possible,
> > and efficient, to answer the queries we will need to ask in future.
> >
> > A good way to do this is with checkpoints, replay, and retention. You
> > periodically checkpoint the state of a table (or indeed any stateful
> stream
> > operator). To re-create the state of a operator at a particular time T
> you
> > start with the previous checkpoint and replay until T. How often to
> > checkpoint depends on the size of the operator's state relative to the
> > stream (tables have a lot of state, aggregate has less, and filter and
> > project have no state) and the length of its memory (there is little
> point
> > making a daily checkpoint for a 1 hour windowed aggregate because you can
> > restore state by starting with *any* checkpoint and replaying an hour of
> > data).
> >
> > Retention is a contract between the consumer and the up-stream operators.
> > If the consumer says to its source operator "I need you to be able to
> > replay any time-range from Feb 12th onwards", that operator either needs
> to
> > store its output back to Feb 12th, or it needs to retain the ability to
> > re-create that output. If the latter, then it tells *its* input(s) what
> > time-range they need to be able to re-play, say from Feb 11th. For rapid
> > play-back, it may choose to keep periodic checkpoints.
> >
> > If the final consumer loosens its retention requirements, to say 19th Feb
> > onwards, then each operator propagates the looser requirements to its
> input
> > operator(s), and this allows garbage to be collected.
> >
> > I don't know whether checkpoints and retention are spelled out in
> > Kappa/Liquid, but if not, they seem a natural and useful extension to the
> > theory.
> >
> > Julian
> >
> >
> > > On Feb 21, 2015, at 4:51 PM, Roger Hoover <roger.hoo...@gmail.com>
> > wrote:
> > >
> > > Thanks, Jay.  This is one of the really nice advantages of local state
> > in my mind.  Full retention would work but eventually run out of space,
> > right?  Ideally, Kafka would guarantee to keep dirty keys for a
> > configurable amount of time as Chris suggested.
> > >
> > > Sent from my iPhone
> > >
> > >> On Feb 21, 2015, at 10:10 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> > >>
> > >> Gotcha. Yes if you want to be able to join to past versions you
> > definitely
> > >> can't turn on compaction as the whole goal of that feature is to
> delete
> > >> past versions. But wouldn't it work to use full retention if you want
> > that
> > >> (and use the MessageChooser interface during reprocessing if you want
> > tight
> > >> control over the state recreation). I mean you have the same dilemma
> if
> > you
> > >> don't use local state but instead use a remote store--the remote store
> > >> likely only keeps the last version of each value so you can't join to
> > the
> > >> past.
> > >>
> > >> -Jay
> > >>
> > >> On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover <roger.hoo...@gmail.com
> >
> > >> wrote:
> > >>
> > >>> Jay,
> > >>>
> > >>> Sorry, I didn't explain it very well.  I'm talking about a
> stream-table
> > >>> join where the table comes from a compacted topic that is used to
> > populate
> > >>> a local data store.  As the stream events are processed, they are
> > joined
> > >>> with dimension data from the local store.
> > >>>
> > >>> If you want to kick off another version of this job that starts back
> in
> > >>> time, the new job cannot reliably recreate the same state of the
> local
> > >>> store that the original had because old values may have been
> compacted
> > >>> away.
> > >>>
> > >>> Does that make sense?
> > >>>
> > >>> Roger
> > >>>
> > >>>> On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > >>>>
> > >>>> Hey Roger,
> > >>>>
> > >>>> I'm not sure if I understand the case you are describing.
> > >>>>
> > >>>> As Chris says we don't yet give you fined grained control over when
> > >>> history
> > >>>> starts to disappear (though we designed with the intention of making
> > that
> > >>>> configurable later). However I'm not sure if you need that for the
> > case
> > >>> you
> > >>>> describe.
> > >>>>
> > >>>> Say you have a job J that takes inputs I1...IN and produces output
> > >>> O1...ON
> > >>>> and in the process accumulates state in a topic S. I think the
> > approach
> > >>> is
> > >>>> to launch a J' (changed or improved in some way) that reprocesses
> > I1...IN
> > >>>> from the beginning of time (or some past point) into O1'...ON' and
> > >>>> accumulates state in S'. So the state for J and the state for J' are
> > >>>> totally independent. J' can't reuse J's state in general because the
> > code
> > >>>> that generates that state may have changed.
> > >>>>
> > >>>> -Jay
> > >>>>
> > >>>> On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover <
> roger.hoo...@gmail.com
> > >
> > >>>> wrote:
> > >>>>
> > >>>>> Chris + Samza Devs,
> > >>>>>
> > >>>>> I was wondering whether Samza could support re-processing as
> > described
> > >>> by
> > >>>>> the Kappa architecture or Liquid (
> > >>>>> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf).
> > >>>>>
> > >>>>> It seems that a changelog is not sufficient to be able to restore
> > state
> > >>>>> backward in time.  Kafka compaction will guarantee that local state
> > can
> > >>>> be
> > >>>>> restored from where it left off but I don't see how it can restore
> > past
> > >>>>> state.
> > >>>>>
> > >>>>> Imagine the case where a stream job has a lot of state in it's
> local
> > >>>> store
> > >>>>> but it has not updated any keys in a long time.
> > >>>>>
> > >>>>> Time t1: All of the data would be in the tail of the Kafka log
> (past
> > >>> the
> > >>>>> cleaner point).
> > >>>>> Time t2:  The job updates some keys.   Now we're in a state where
> the
> > >>>> next
> > >>>>> compaction will blow away the old values for those keys.
> > >>>>> Time t3:  Compaction occurs and old values are discarded.
> > >>>>>
> > >>>>> Say we want to launch a re-processing job that would begin from t1.
> > If
> > >>>> we
> > >>>>> launch that job before t3, it will correctly restore it's state.
> > >>>> However,
> > >>>>> if we launch the job after t3, it will be missing old values,
> right?
> > >>>>>
> > >>>>> Unless I'm misunderstanding something, the only way around this is
> to
> > >>>> keep
> > >>>>> snapshots in addition to the changelog.  Has there been any
> > discussion
> > >>> of
> > >>>>> providing an option in Samza of taking RocksDB snapshots and
> > persisting
> > >>>>> them to an object store or HDFS?
> > >>>>>
> > >>>>> Thanks,
> > >>>>>
> > >>>>> Roger
> > >>>
> >
> >
>

Reply via email to