Hi Jeremy, If my understanding is correct, whenever you add a new rule, you want to apply this rule to the historical data. Right?
If you do not care about duplication, you can create a new task that contains existing rules and new rules. Configure bootstrap. This will apply all the rules from the beginning of the input stream. The shortcoming is you will get duplicated results for old rules. If you can not tolerate the shortcoming, 1) get the offset of the latest-processed message of old rules. 2) In your new task, ignore messages before that offset for the old rules. 3) bootstrap. Hope this helps. Maybe your use case is more complicated? Thanks, Fang, Yan yanfang...@gmail.com On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <athomewithagroove...@gmail.com> wrote: > So, I'm wanting to use Samza for a project I'm working on, but I keep > running into a problem with bootstrapping. > > Let's say there's a Kafka topic called Numbers that I want to consume with > Samza. Let's say each message has a single integer in it, and I want to > classify it as even or odd. So I have two topics that I'm using for > output, one called Even and one called Odd. I write a simple stream task > called Classifier that consumes the Numbers topic, examines each incoming > integer and writes it back out to Even or Odd. > > Now, let's say I want to be able to add classifications dynamically, like : > "divisible by three", "divisible by four", or "numbers that appear in my > date of birth". And let's say I have an API I can query that gives me all > the assignment rules, such as "when a number is divisble by 3, write it out > to a topic called 'divisible_by_three'", or "when a number appears in the > string 12/12/1981, write it to the 'my_birthday' topic". So now I rewrite > my stream task to query this API for assignment rules. It reads integers > from the Numbers topic and writes them back out to one or more output > topics, according to the assignment rules. > > Now, let's make this even more complicated. When I add a new > classification, I want to go back to the very beginning of the Numbers > topic and classify them accordingly. Once we've consumed all the old > "historical" integers, I want to apply this classification new integers as > they come in. > > And this is where I get stuck. > > One thing I can do : when I want to add a new classification, I can create > a bootstrap job by setting the > "systems.kafka.streams.numbers.samza.offset.default" property to "oldest". > And that's great, but the problem is, once I've "caught up", I'd like to > kill the bootstrap job and just let the Classifier handle this new > assignment. So, I'd want to do some kind of handover from the bootstrap > job to the Classifier job. But how to do this? > > So, the question I must ask is this : Is Samza even an appopriate way to > solve this problem? Has this problem ever come up for anybody else? How > have they solved it? I would really like to use Samza because it seems > like an appopriate technology, and I'd really really really really like to > avoid re-inventing the wheel. > > A couple solutions I came up with : > > 1) The simple solution. Have a separate Samza job for each > classification. If I want to add a new classification, I create a new job > and set it up as a bootstrap job. This would solve the problem. However, > we may want to have many, many classifications. It could be as many as > 1,000,000, which would mean up to 1,000,000 simultaneously running jobs. > This could create a lot of overhead for YARN and Kafka. > > 2) My overly-complicated workaround solution. Each assignment rule has an > "isnew" flag. If it's a new classification that hasn't fully bootstrapped > yet, the "isnew" flag is set to TRUE. When my classifier queries the API > for assignment rules, it ignores any rule with an "isnew" flag. When I > want to add a new classification, I create a new bootstrap job for that > classification. Every so often, maybe every few days or so, if all of my > bootstrap jobs have "caught up", I kill all of the bootstrap jobs and > classifier jobs. I set all the "isnew" flags to FALSE. Then I restart the > classifier job. This is kind of an ugly solution, and I'm not even sure it > would work. For one thing, I'd need some way of knowing if a boostrap job > has "caught up". Secondly, I'd essentially be restarting the classifier > job periodically, which just seems like an ugly solution. I don't like it. > > 3) Some other kind of really complicated solution I haven't thought of yet, > probably involving locks, transactions, concurrancy, and interprocess > communication. > > Thanks for reading this whole thing. Please let me know if you have any > suggestions. >