Hello Yan, Thank you for the suggestion! I think your solution would work, however, I am afraid it would create a performance problem for our users.
Let's say we kill the Classifier task, and create a new Classifier task with both the existing rules and new rules. We get the offset of the latest-processed message for the old rules. Let's call this offset Last-Processed-Old-Rules. We ignore messages before Last-Processed-Old-Rules for the old rules. We configure the new Classifier task to be a bootstrap task. Let's say we have users who are watching the output topics, and they are expecting near-realtime updates. They won't see any updates for the old rules until our task has passed the Last-Processed-Old-Rules offset. If we have a lot of messages in that topic, that could take a long time. This is why I was hoping there would be a way to bootstrap the new rules while we're still processing the old rules. Do you think there is a way to do that? On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <yanfang...@gmail.com> wrote: > Hi Jeremy, > > If my understanding is correct, whenever you add a new rule, you want to > apply this rule to the historical data. Right? > > If you do not care about duplication, you can create a new task that > contains existing rules and new rules. Configure bootstrap. This will apply > all the rules from the beginning of the input stream. The shortcoming is > you will get duplicated results for old rules. > > If you can not tolerate the shortcoming, 1) get the offset of the > latest-processed message of old rules. 2) In your new task, ignore messages > before that offset for the old rules. 3) bootstrap. > > Hope this helps. Maybe your use case is more complicated? > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <athomewithagroove...@gmail.com > > > wrote: > > > So, I'm wanting to use Samza for a project I'm working on, but I keep > > running into a problem with bootstrapping. > > > > Let's say there's a Kafka topic called Numbers that I want to consume > with > > Samza. Let's say each message has a single integer in it, and I want to > > classify it as even or odd. So I have two topics that I'm using for > > output, one called Even and one called Odd. I write a simple stream task > > called Classifier that consumes the Numbers topic, examines each incoming > > integer and writes it back out to Even or Odd. > > > > Now, let's say I want to be able to add classifications dynamically, > like : > > "divisible by three", "divisible by four", or "numbers that appear in my > > date of birth". And let's say I have an API I can query that gives me > all > > the assignment rules, such as "when a number is divisble by 3, write it > out > > to a topic called 'divisible_by_three'", or "when a number appears in the > > string 12/12/1981, write it to the 'my_birthday' topic". So now I > rewrite > > my stream task to query this API for assignment rules. It reads integers > > from the Numbers topic and writes them back out to one or more output > > topics, according to the assignment rules. > > > > Now, let's make this even more complicated. When I add a new > > classification, I want to go back to the very beginning of the Numbers > > topic and classify them accordingly. Once we've consumed all the old > > "historical" integers, I want to apply this classification new integers > as > > they come in. > > > > And this is where I get stuck. > > > > One thing I can do : when I want to add a new classification, I can > create > > a bootstrap job by setting the > > "systems.kafka.streams.numbers.samza.offset.default" property to > "oldest". > > And that's great, but the problem is, once I've "caught up", I'd like to > > kill the bootstrap job and just let the Classifier handle this new > > assignment. So, I'd want to do some kind of handover from the bootstrap > > job to the Classifier job. But how to do this? > > > > So, the question I must ask is this : Is Samza even an appopriate way to > > solve this problem? Has this problem ever come up for anybody else? How > > have they solved it? I would really like to use Samza because it seems > > like an appopriate technology, and I'd really really really really like > to > > avoid re-inventing the wheel. > > > > A couple solutions I came up with : > > > > 1) The simple solution. Have a separate Samza job for each > > classification. If I want to add a new classification, I create a new > job > > and set it up as a bootstrap job. This would solve the problem. > However, > > we may want to have many, many classifications. It could be as many as > > 1,000,000, which would mean up to 1,000,000 simultaneously running jobs. > > This could create a lot of overhead for YARN and Kafka. > > > > 2) My overly-complicated workaround solution. Each assignment rule has > an > > "isnew" flag. If it's a new classification that hasn't fully > bootstrapped > > yet, the "isnew" flag is set to TRUE. When my classifier queries the API > > for assignment rules, it ignores any rule with an "isnew" flag. When I > > want to add a new classification, I create a new bootstrap job for that > > classification. Every so often, maybe every few days or so, if all of my > > bootstrap jobs have "caught up", I kill all of the bootstrap jobs and > > classifier jobs. I set all the "isnew" flags to FALSE. Then I restart > the > > classifier job. This is kind of an ugly solution, and I'm not even sure > it > > would work. For one thing, I'd need some way of knowing if a boostrap > job > > has "caught up". Secondly, I'd essentially be restarting the classifier > > job periodically, which just seems like an ugly solution. I don't like > it. > > > > 3) Some other kind of really complicated solution I haven't thought of > yet, > > probably involving locks, transactions, concurrancy, and interprocess > > communication. > > > > Thanks for reading this whole thing. Please let me know if you have any > > suggestions. > > >