What about this: 1) Add new rule to the classifier task 2) Take note of offset of the first message processed after restart 3) Run a job to process from offset 0 to the offset from #2, after which the job is deleted
I don't know how to do 2 or 3, but perhaps some of the core Samza folk could shed some light? b On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <athomewithagroove...@gmail.com> wrote: > Hello Yan, > > Thank you for the suggestion! I think your solution would work, however, I > am afraid it would create a performance problem for our users. > > Let's say we kill the Classifier task, and create a new Classifier task > with both the existing rules and new rules. We get the offset of the > latest-processed message for the old rules. Let's call this offset > Last-Processed-Old-Rules. We ignore messages > before Last-Processed-Old-Rules for the old rules. We configure the new > Classifier task to be a bootstrap task. > > Let's say we have users who are watching the output topics, and they are > expecting near-realtime updates. They won't see any updates for the old > rules until our task has passed the Last-Processed-Old-Rules offset. If we > have a lot of messages in that topic, that could take a long time. This is > why I was hoping there would be a way to bootstrap the new rules while > we're still processing the old rules. Do you think there is a way to do > that? > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <yanfang...@gmail.com> wrote: > > > Hi Jeremy, > > > > If my understanding is correct, whenever you add a new rule, you want to > > apply this rule to the historical data. Right? > > > > If you do not care about duplication, you can create a new task that > > contains existing rules and new rules. Configure bootstrap. This will > apply > > all the rules from the beginning of the input stream. The shortcoming is > > you will get duplicated results for old rules. > > > > If you can not tolerate the shortcoming, 1) get the offset of the > > latest-processed message of old rules. 2) In your new task, ignore > messages > > before that offset for the old rules. 3) bootstrap. > > > > Hope this helps. Maybe your use case is more complicated? > > > > Thanks, > > > > Fang, Yan > > yanfang...@gmail.com > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p < > athomewithagroove...@gmail.com > > > > > wrote: > > > > > So, I'm wanting to use Samza for a project I'm working on, but I keep > > > running into a problem with bootstrapping. > > > > > > Let's say there's a Kafka topic called Numbers that I want to consume > > with > > > Samza. Let's say each message has a single integer in it, and I want > to > > > classify it as even or odd. So I have two topics that I'm using for > > > output, one called Even and one called Odd. I write a simple stream > task > > > called Classifier that consumes the Numbers topic, examines each > incoming > > > integer and writes it back out to Even or Odd. > > > > > > Now, let's say I want to be able to add classifications dynamically, > > like : > > > "divisible by three", "divisible by four", or "numbers that appear in > my > > > date of birth". And let's say I have an API I can query that gives me > > all > > > the assignment rules, such as "when a number is divisble by 3, write it > > out > > > to a topic called 'divisible_by_three'", or "when a number appears in > the > > > string 12/12/1981, write it to the 'my_birthday' topic". So now I > > rewrite > > > my stream task to query this API for assignment rules. It reads > integers > > > from the Numbers topic and writes them back out to one or more output > > > topics, according to the assignment rules. > > > > > > Now, let's make this even more complicated. When I add a new > > > classification, I want to go back to the very beginning of the Numbers > > > topic and classify them accordingly. Once we've consumed all the old > > > "historical" integers, I want to apply this classification new integers > > as > > > they come in. > > > > > > And this is where I get stuck. > > > > > > One thing I can do : when I want to add a new classification, I can > > create > > > a bootstrap job by setting the > > > "systems.kafka.streams.numbers.samza.offset.default" property to > > "oldest". > > > And that's great, but the problem is, once I've "caught up", I'd like > to > > > kill the bootstrap job and just let the Classifier handle this new > > > assignment. So, I'd want to do some kind of handover from the > bootstrap > > > job to the Classifier job. But how to do this? > > > > > > So, the question I must ask is this : Is Samza even an appopriate way > to > > > solve this problem? Has this problem ever come up for anybody else? > How > > > have they solved it? I would really like to use Samza because it seems > > > like an appopriate technology, and I'd really really really really like > > to > > > avoid re-inventing the wheel. > > > > > > A couple solutions I came up with : > > > > > > 1) The simple solution. Have a separate Samza job for each > > > classification. If I want to add a new classification, I create a new > > job > > > and set it up as a bootstrap job. This would solve the problem. > > However, > > > we may want to have many, many classifications. It could be as many as > > > 1,000,000, which would mean up to 1,000,000 simultaneously running > jobs. > > > This could create a lot of overhead for YARN and Kafka. > > > > > > 2) My overly-complicated workaround solution. Each assignment rule has > > an > > > "isnew" flag. If it's a new classification that hasn't fully > > bootstrapped > > > yet, the "isnew" flag is set to TRUE. When my classifier queries the > API > > > for assignment rules, it ignores any rule with an "isnew" flag. When I > > > want to add a new classification, I create a new bootstrap job for that > > > classification. Every so often, maybe every few days or so, if all of > my > > > bootstrap jobs have "caught up", I kill all of the bootstrap jobs and > > > classifier jobs. I set all the "isnew" flags to FALSE. Then I restart > > the > > > classifier job. This is kind of an ugly solution, and I'm not even > sure > > it > > > would work. For one thing, I'd need some way of knowing if a boostrap > > job > > > has "caught up". Secondly, I'd essentially be restarting the > classifier > > > job periodically, which just seems like an ugly solution. I don't like > > it. > > > > > > 3) Some other kind of really complicated solution I haven't thought of > > yet, > > > probably involving locks, transactions, concurrancy, and interprocess > > > communication. > > > > > > Thanks for reading this whole thing. Please let me know if you have > any > > > suggestions. > > > > > >