you are able to call coordinator.shutdown to shut the job down after it
reaches the offset.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> I feel Ben's solution a bit simpler that you just need to restart your
> current job with both rules on the check pointed offset, and start a new
> job from offset 0 with only the new rule and it will stop at the checkout
> pointed offset. But of course it requires the second job to be able to
> shutdown itself upon some specific offset which I am not sure if it is
> already supported.
>
> Guozhang
>
> On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang <yanfang...@gmail.com> wrote:
>
> > Hi Jeremy,
> >
> > In order to reach this goal, we have to assume that the job with new
> rules
> > can always catch up with the one with old rules. Otherwise, I think we do
> > not have the choice but running a lot of jobs simultaneously.
> >
> > Under our assumption, we have job1 with old rules running, and now add
> job2
> > which integrates old rules and new rules to run. Job2 frequently
> > checks the Last-Processed-Old-Rules
> > offset from job1 (because job1 is running too), and it only applies new
> > rule to the data until catch up with the Last-Processed-Old-Rules offset.
> > Then it sends signal to the job1 and shutdown job1, and applies all rules
> > to the stream.
> >
> > In terms of how to shutdown the job1, here is one solution
> > <
> >
> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E
> > >
> > provided by Chris - e.g. you can have a control stream to get job1
> > shutdown. Samza will provide this kind of stream after SAMZA-348
> > <https://issues.apache.org/jira/browse/SAMZA-348>, which is under active
> > development.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <
> athomewithagroove...@gmail.com
> > >
> > wrote:
> >
> > > Hello Yan,
> > >
> > > Thank you for the suggestion!  I think your solution would work,
> > however, I
> > > am afraid it would create a performance problem for our users.
> > >
> > > Let's say we kill the Classifier task, and create a new Classifier task
> > > with both the existing rules and new rules. We get the offset of the
> > > latest-processed message for the old rules.  Let's call this offset
> > > Last-Processed-Old-Rules.  We ignore messages
> > > before Last-Processed-Old-Rules for the old rules.  We configure the
> new
> > > Classifier task to be a bootstrap task.
> > >
> > > Let's say we have users who are watching the output topics, and they
> are
> > > expecting near-realtime updates.  They won't see any updates for the
> old
> > > rules until our task has passed the Last-Processed-Old-Rules offset.
> If
> > we
> > > have a lot of messages in that topic, that could take a long time.
> This
> > is
> > > why I was hoping there would be a way to bootstrap the new rules while
> > > we're still processing the old rules.  Do you think there is a way to
> do
> > > that?
> > >
> > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <yanfang...@gmail.com>
> wrote:
> > >
> > > > Hi Jeremy,
> > > >
> > > > If my understanding is correct, whenever you add a new rule, you want
> > to
> > > > apply this rule to the historical data. Right?
> > > >
> > > > If you do not care about duplication, you can create a new task that
> > > > contains existing rules and new rules. Configure bootstrap. This will
> > > apply
> > > > all the rules from the beginning of the input stream. The shortcoming
> > is
> > > > you will get duplicated results for old rules.
> > > >
> > > > If you can not tolerate the shortcoming, 1) get the offset of the
> > > > latest-processed message of old rules. 2) In your new task, ignore
> > > messages
> > > > before that offset for the old rules. 3) bootstrap.
> > > >
> > > > Hope this helps. Maybe your use case is more complicated?
> > > >
> > > > Thanks,
> > > >
> > > > Fang, Yan
> > > > yanfang...@gmail.com
> > > >
> > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> > > athomewithagroove...@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > So, I'm wanting to use Samza for a project I'm working on, but I
> keep
> > > > > running into a problem with bootstrapping.
> > > > >
> > > > > Let's say there's a Kafka topic called Numbers that I want to
> consume
> > > > with
> > > > > Samza.  Let's say each message has a single integer in it, and I
> want
> > > to
> > > > > classify it as even or odd.  So I have two topics that I'm using
> for
> > > > > output, one called Even and one called Odd.  I write a simple
> stream
> > > task
> > > > > called Classifier that consumes the Numbers topic, examines each
> > > incoming
> > > > > integer and writes it back out to Even or Odd.
> > > > >
> > > > > Now, let's say I want to be able to add classifications
> dynamically,
> > > > like :
> > > > > "divisible by three", "divisible by four", or "numbers that appear
> in
> > > my
> > > > > date of birth".  And let's say I have an API I can query that gives
> > me
> > > > all
> > > > > the assignment rules, such as "when a number is divisble by 3,
> write
> > it
> > > > out
> > > > > to a topic called 'divisible_by_three'", or "when a number appears
> in
> > > the
> > > > > string 12/12/1981, write it to the 'my_birthday' topic".  So now I
> > > > rewrite
> > > > > my stream task to query this API for assignment rules.  It reads
> > > integers
> > > > > from the Numbers topic and writes them back out to one or more
> output
> > > > > topics, according to the assignment rules.
> > > > >
> > > > > Now, let's make this even more complicated.  When I add a new
> > > > > classification, I want to go back to the very beginning of the
> > Numbers
> > > > > topic and classify them accordingly.  Once we've consumed all the
> old
> > > > > "historical" integers, I want to apply this classification new
> > integers
> > > > as
> > > > > they come in.
> > > > >
> > > > > And this is where I get stuck.
> > > > >
> > > > > One thing I can do : when I want to add a new classification, I can
> > > > create
> > > > > a bootstrap job by setting the
> > > > > "systems.kafka.streams.numbers.samza.offset.default" property to
> > > > "oldest".
> > > > > And that's great, but the problem is, once I've "caught up", I'd
> like
> > > to
> > > > > kill the bootstrap job and just let the Classifier handle this new
> > > > > assignment.  So, I'd want to do some kind of handover from the
> > > bootstrap
> > > > > job to the Classifier job.  But how to do this?
> > > > >
> > > > > So, the question I must ask is this : Is Samza even an appopriate
> way
> > > to
> > > > > solve this problem?  Has this problem ever come up for anybody
> else?
> > > How
> > > > > have they solved it?  I would really like to use Samza because it
> > seems
> > > > > like an appopriate technology, and I'd really really really really
> > like
> > > > to
> > > > > avoid re-inventing the wheel.
> > > > >
> > > > > A couple solutions I came up with :
> > > > >
> > > > > 1) The simple solution.  Have a separate Samza job for each
> > > > > classification.  If I want to add a new classification, I create a
> > new
> > > > job
> > > > > and set it up as a bootstrap job.  This would solve the problem.
> > > > However,
> > > > > we may want to have many, many classifications.  It could be as
> many
> > as
> > > > > 1,000,000, which would mean up to 1,000,000 simultaneously running
> > > jobs.
> > > > > This could create a lot of overhead for YARN and Kafka.
> > > > >
> > > > > 2) My overly-complicated workaround solution.  Each assignment rule
> > has
> > > > an
> > > > > "isnew" flag.  If it's a new classification that hasn't fully
> > > > bootstrapped
> > > > > yet, the "isnew" flag is set to TRUE.  When my classifier queries
> the
> > > API
> > > > > for assignment rules, it ignores any rule with an "isnew" flag.
> > When I
> > > > > want to add a new classification, I create a new bootstrap job for
> > that
> > > > > classification.  Every so often, maybe every few days or so, if all
> > of
> > > my
> > > > > bootstrap jobs have "caught up", I kill all of the bootstrap jobs
> and
> > > > > classifier jobs.  I set all the "isnew" flags to FALSE.  Then I
> > restart
> > > > the
> > > > > classifier job.  This is kind of an ugly solution, and I'm not even
> > > sure
> > > > it
> > > > > would work.  For one thing, I'd need some way of knowing if a
> > boostrap
> > > > job
> > > > > has "caught up".  Secondly, I'd essentially be restarting the
> > > classifier
> > > > > job periodically, which just seems like an ugly solution.  I don't
> > like
> > > > it.
> > > > >
> > > > > 3) Some other kind of really complicated solution I haven't thought
> > of
> > > > yet,
> > > > > probably involving locks, transactions, concurrancy, and
> interprocess
> > > > > communication.
> > > > >
> > > > > Thanks for reading this whole thing.  Please let me know if you
> have
> > > any
> > > > > suggestions.
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to