Hey Roger,

I believe your description is correct. Kafka has a "dirty ratio" concept
for its log-compacted topics. Once the dirty (unclean) portion of the log
passes the "dirty ratio" (e.g. 50% of the log hasn't been clean, based on
bytes on disk), the log cleaner kicks in. Once this happens, you can't
restore to prior intermediate states.

There has been some verbal discussion about changing the concept of a
"dirty ratio" to be time-based. Something like, "always keep the last N
hours dirty". This would allow you to restore to any state within the last
N hours (but not prior, as you point out). The nice thing about this
approach is that it gives a very clear guarantee about how far back you can

If you wanted to restore to a point in time prior to the last cleaning,
then you'd have to keep some sort of snapshot, as you suggest. There might
be other tricks you could play, but nothing is coming to mind.

Where this gets a little funky for Samza is in a case where a state store's
changelog is ahead of the last checkpointed input offsets. In an ideal
world, you'd like to restore up to the offset in the changelog that matched
the point where the input offsets were committed. For example, if you have
a message order like so:

  cl1, cl2, cl3, cp1, cl4, cl5, <fail>

When you start up, you'd like to read the changelog up to cl3, and start
the consumers at offset cp1. If your processing is idempotent and
deterministic, then cl4 and cl5 will get over-written, and you'll have
consistent state moving forward. In practice, we don't do this. If a log
cleaner were to kick in, though, and delete cl3 (say it has the same key as
cl4) before you restart, then you *can't* restore to this point.

In practice we haven't really worried about this because, as it is, we
simply restore up to cl5 on restart. There is a lot of complexity with not
restoring up to cl5:

1. What if someone is consuming from the changelog topic? You can't simply
truncate the changelog on restart.
2. What if the log cleaner has deleted cl3, or prior CL messages (as you
3. What if your processing is non-deterministic or non-idempotent?

The most ideal scenario is simply to have Kafka support transactions. This
will allow us to atomically commit cp1 and all output (both changelog, and
regular). This would at least guarantee state-consistent restore for the
*current* checkpoint. Going backwards in time would have to be addressed by
the time-based dirty policy that I described above.

In the absence of this, we *could* make the checkpoints contain changelog
offsets, and only restore up to that point on restart (cp3 in the example).
We haven't done this, but I think it's do-able. This will at least make
things restore properly for the *current* checkpoint, when the processing
is *deterministic* and *idempotent*. You can open a JIRA for this if you
need it.


On Thu, Feb 19, 2015 at 6:55 PM, Roger Hoover <roger.hoo...@gmail.com>

> Chris + Samza Devs,
> I was wondering whether Samza could support re-processing as described by
> the Kappa architecture or Liquid (
> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf).
> It seems that a changelog is not sufficient to be able to restore state
> backward in time.  Kafka compaction will guarantee that local state can be
> restored from where it left off but I don't see how it can restore past
> state.
> Imagine the case where a stream job has a lot of state in it's local store
> but it has not updated any keys in a long time.
> Time t1: All of the data would be in the tail of the Kafka log (past the
> cleaner point).
> Time t2:  The job updates some keys.   Now we're in a state where the next
> compaction will blow away the old values for those keys.
> Time t3:  Compaction occurs and old values are discarded.
> Say we want to launch a re-processing job that would begin from t1.  If we
> launch that job before t3, it will correctly restore it's state.  However,
> if we launch the job after t3, it will be missing old values, right?
> Unless I'm misunderstanding something, the only way around this is to keep
> snapshots in addition to the changelog.  Has there been any discussion of
> providing an option in Samza of taking RocksDB snapshots and persisting
> them to an object store or HDFS?
> Thanks,
> Roger

Reply via email to