you are able to call coordinator.shutdown to shut the job down after it reaches the offset.
Thanks, Fang, Yan yanfang...@gmail.com On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang <wangg...@gmail.com> wrote: > I feel Ben's solution a bit simpler that you just need to restart your > current job with both rules on the check pointed offset, and start a new > job from offset 0 with only the new rule and it will stop at the checkout > pointed offset. But of course it requires the second job to be able to > shutdown itself upon some specific offset which I am not sure if it is > already supported. > > Guozhang > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang <yanfang...@gmail.com> wrote: > > > Hi Jeremy, > > > > In order to reach this goal, we have to assume that the job with new > rules > > can always catch up with the one with old rules. Otherwise, I think we do > > not have the choice but running a lot of jobs simultaneously. > > > > Under our assumption, we have job1 with old rules running, and now add > job2 > > which integrates old rules and new rules to run. Job2 frequently > > checks the Last-Processed-Old-Rules > > offset from job1 (because job1 is running too), and it only applies new > > rule to the data until catch up with the Last-Processed-Old-Rules offset. > > Then it sends signal to the job1 and shutdown job1, and applies all rules > > to the stream. > > > > In terms of how to shutdown the job1, here is one solution > > < > > > http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E > > > > > provided by Chris - e.g. you can have a control stream to get job1 > > shutdown. Samza will provide this kind of stream after SAMZA-348 > > <https://issues.apache.org/jira/browse/SAMZA-348>, which is under active > > development. > > > > Thanks, > > > > Fang, Yan > > yanfang...@gmail.com > > > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p < > athomewithagroove...@gmail.com > > > > > wrote: > > > > > Hello Yan, > > > > > > Thank you for the suggestion! I think your solution would work, > > however, I > > > am afraid it would create a performance problem for our users. > > > > > > Let's say we kill the Classifier task, and create a new Classifier task > > > with both the existing rules and new rules. We get the offset of the > > > latest-processed message for the old rules. Let's call this offset > > > Last-Processed-Old-Rules. We ignore messages > > > before Last-Processed-Old-Rules for the old rules. We configure the > new > > > Classifier task to be a bootstrap task. > > > > > > Let's say we have users who are watching the output topics, and they > are > > > expecting near-realtime updates. They won't see any updates for the > old > > > rules until our task has passed the Last-Processed-Old-Rules offset. > If > > we > > > have a lot of messages in that topic, that could take a long time. > This > > is > > > why I was hoping there would be a way to bootstrap the new rules while > > > we're still processing the old rules. Do you think there is a way to > do > > > that? > > > > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <yanfang...@gmail.com> > wrote: > > > > > > > Hi Jeremy, > > > > > > > > If my understanding is correct, whenever you add a new rule, you want > > to > > > > apply this rule to the historical data. Right? > > > > > > > > If you do not care about duplication, you can create a new task that > > > > contains existing rules and new rules. Configure bootstrap. This will > > > apply > > > > all the rules from the beginning of the input stream. The shortcoming > > is > > > > you will get duplicated results for old rules. > > > > > > > > If you can not tolerate the shortcoming, 1) get the offset of the > > > > latest-processed message of old rules. 2) In your new task, ignore > > > messages > > > > before that offset for the old rules. 3) bootstrap. > > > > > > > > Hope this helps. Maybe your use case is more complicated? > > > > > > > > Thanks, > > > > > > > > Fang, Yan > > > > yanfang...@gmail.com > > > > > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p < > > > athomewithagroove...@gmail.com > > > > > > > > > wrote: > > > > > > > > > So, I'm wanting to use Samza for a project I'm working on, but I > keep > > > > > running into a problem with bootstrapping. > > > > > > > > > > Let's say there's a Kafka topic called Numbers that I want to > consume > > > > with > > > > > Samza. Let's say each message has a single integer in it, and I > want > > > to > > > > > classify it as even or odd. So I have two topics that I'm using > for > > > > > output, one called Even and one called Odd. I write a simple > stream > > > task > > > > > called Classifier that consumes the Numbers topic, examines each > > > incoming > > > > > integer and writes it back out to Even or Odd. > > > > > > > > > > Now, let's say I want to be able to add classifications > dynamically, > > > > like : > > > > > "divisible by three", "divisible by four", or "numbers that appear > in > > > my > > > > > date of birth". And let's say I have an API I can query that gives > > me > > > > all > > > > > the assignment rules, such as "when a number is divisble by 3, > write > > it > > > > out > > > > > to a topic called 'divisible_by_three'", or "when a number appears > in > > > the > > > > > string 12/12/1981, write it to the 'my_birthday' topic". So now I > > > > rewrite > > > > > my stream task to query this API for assignment rules. It reads > > > integers > > > > > from the Numbers topic and writes them back out to one or more > output > > > > > topics, according to the assignment rules. > > > > > > > > > > Now, let's make this even more complicated. When I add a new > > > > > classification, I want to go back to the very beginning of the > > Numbers > > > > > topic and classify them accordingly. Once we've consumed all the > old > > > > > "historical" integers, I want to apply this classification new > > integers > > > > as > > > > > they come in. > > > > > > > > > > And this is where I get stuck. > > > > > > > > > > One thing I can do : when I want to add a new classification, I can > > > > create > > > > > a bootstrap job by setting the > > > > > "systems.kafka.streams.numbers.samza.offset.default" property to > > > > "oldest". > > > > > And that's great, but the problem is, once I've "caught up", I'd > like > > > to > > > > > kill the bootstrap job and just let the Classifier handle this new > > > > > assignment. So, I'd want to do some kind of handover from the > > > bootstrap > > > > > job to the Classifier job. But how to do this? > > > > > > > > > > So, the question I must ask is this : Is Samza even an appopriate > way > > > to > > > > > solve this problem? Has this problem ever come up for anybody > else? > > > How > > > > > have they solved it? I would really like to use Samza because it > > seems > > > > > like an appopriate technology, and I'd really really really really > > like > > > > to > > > > > avoid re-inventing the wheel. > > > > > > > > > > A couple solutions I came up with : > > > > > > > > > > 1) The simple solution. Have a separate Samza job for each > > > > > classification. If I want to add a new classification, I create a > > new > > > > job > > > > > and set it up as a bootstrap job. This would solve the problem. > > > > However, > > > > > we may want to have many, many classifications. It could be as > many > > as > > > > > 1,000,000, which would mean up to 1,000,000 simultaneously running > > > jobs. > > > > > This could create a lot of overhead for YARN and Kafka. > > > > > > > > > > 2) My overly-complicated workaround solution. Each assignment rule > > has > > > > an > > > > > "isnew" flag. If it's a new classification that hasn't fully > > > > bootstrapped > > > > > yet, the "isnew" flag is set to TRUE. When my classifier queries > the > > > API > > > > > for assignment rules, it ignores any rule with an "isnew" flag. > > When I > > > > > want to add a new classification, I create a new bootstrap job for > > that > > > > > classification. Every so often, maybe every few days or so, if all > > of > > > my > > > > > bootstrap jobs have "caught up", I kill all of the bootstrap jobs > and > > > > > classifier jobs. I set all the "isnew" flags to FALSE. Then I > > restart > > > > the > > > > > classifier job. This is kind of an ugly solution, and I'm not even > > > sure > > > > it > > > > > would work. For one thing, I'd need some way of knowing if a > > boostrap > > > > job > > > > > has "caught up". Secondly, I'd essentially be restarting the > > > classifier > > > > > job periodically, which just seems like an ugly solution. I don't > > like > > > > it. > > > > > > > > > > 3) Some other kind of really complicated solution I haven't thought > > of > > > > yet, > > > > > probably involving locks, transactions, concurrancy, and > interprocess > > > > > communication. > > > > > > > > > > Thanks for reading this whole thing. Please let me know if you > have > > > any > > > > > suggestions. > > > > > > > > > > > > > > > > > > -- > -- Guozhang >