So, I'm wanting to use Samza for a project I'm working on, but I keep
running into a problem with bootstrapping.

Let's say there's a Kafka topic called Numbers that I want to consume with
Samza.  Let's say each message has a single integer in it, and I want to
classify it as even or odd.  So I have two topics that I'm using for
output, one called Even and one called Odd.  I write a simple stream task
called Classifier that consumes the Numbers topic, examines each incoming
integer and writes it back out to Even or Odd.

Now, let's say I want to be able to add classifications dynamically, like :
"divisible by three", "divisible by four", or "numbers that appear in my
date of birth".  And let's say I have an API I can query that gives me all
the assignment rules, such as "when a number is divisble by 3, write it out
to a topic called 'divisible_by_three'", or "when a number appears in the
string 12/12/1981, write it to the 'my_birthday' topic".  So now I rewrite
my stream task to query this API for assignment rules.  It reads integers
from the Numbers topic and writes them back out to one or more output
topics, according to the assignment rules.

Now, let's make this even more complicated.  When I add a new
classification, I want to go back to the very beginning of the Numbers
topic and classify them accordingly.  Once we've consumed all the old
"historical" integers, I want to apply this classification new integers as
they come in.

And this is where I get stuck.

One thing I can do : when I want to add a new classification, I can create
a bootstrap job by setting the
"systems.kafka.streams.numbers.samza.offset.default" property to "oldest".
And that's great, but the problem is, once I've "caught up", I'd like to
kill the bootstrap job and just let the Classifier handle this new
assignment.  So, I'd want to do some kind of handover from the bootstrap
job to the Classifier job.  But how to do this?

So, the question I must ask is this : Is Samza even an appopriate way to
solve this problem?  Has this problem ever come up for anybody else?  How
have they solved it?  I would really like to use Samza because it seems
like an appopriate technology, and I'd really really really really like to
avoid re-inventing the wheel.

A couple solutions I came up with :

1) The simple solution.  Have a separate Samza job for each
classification.  If I want to add a new classification, I create a new job
and set it up as a bootstrap job.  This would solve the problem.  However,
we may want to have many, many classifications.  It could be as many as
1,000,000, which would mean up to 1,000,000 simultaneously running jobs.
This could create a lot of overhead for YARN and Kafka.

2) My overly-complicated workaround solution.  Each assignment rule has an
"isnew" flag.  If it's a new classification that hasn't fully bootstrapped
yet, the "isnew" flag is set to TRUE.  When my classifier queries the API
for assignment rules, it ignores any rule with an "isnew" flag.  When I
want to add a new classification, I create a new bootstrap job for that
classification.  Every so often, maybe every few days or so, if all of my
bootstrap jobs have "caught up", I kill all of the bootstrap jobs and
classifier jobs.  I set all the "isnew" flags to FALSE.  Then I restart the
classifier job.  This is kind of an ugly solution, and I'm not even sure it
would work.  For one thing, I'd need some way of knowing if a boostrap job
has "caught up".  Secondly, I'd essentially be restarting the classifier
job periodically, which just seems like an ugly solution.  I don't like it.

3) Some other kind of really complicated solution I haven't thought of yet,
probably involving locks, transactions, concurrancy, and interprocess
communication.

Thanks for reading this whole thing.  Please let me know if you have any
suggestions.

Reply via email to