Hi Jeremy,

In order to reach this goal, we have to assume that the job with new rules
can always catch up with the one with old rules. Otherwise, I think we do
not have the choice but running a lot of jobs simultaneously.

Under our assumption, we have job1 with old rules running, and now add job2
which integrates old rules and new rules to run. Job2 frequently
checks the Last-Processed-Old-Rules
offset from job1 (because job1 is running too), and it only applies new
rule to the data until catch up with the Last-Processed-Old-Rules offset.
Then it sends signal to the job1 and shutdown job1, and applies all rules
to the stream.

In terms of how to shutdown the job1, here is one solution
<http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E>
provided by Chris - e.g. you can have a control stream to get job1
shutdown. Samza will provide this kind of stream after SAMZA-348
<https://issues.apache.org/jira/browse/SAMZA-348>, which is under active
development.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <athomewithagroove...@gmail.com>
wrote:

> Hello Yan,
>
> Thank you for the suggestion!  I think your solution would work, however, I
> am afraid it would create a performance problem for our users.
>
> Let's say we kill the Classifier task, and create a new Classifier task
> with both the existing rules and new rules. We get the offset of the
> latest-processed message for the old rules.  Let's call this offset
> Last-Processed-Old-Rules.  We ignore messages
> before Last-Processed-Old-Rules for the old rules.  We configure the new
> Classifier task to be a bootstrap task.
>
> Let's say we have users who are watching the output topics, and they are
> expecting near-realtime updates.  They won't see any updates for the old
> rules until our task has passed the Last-Processed-Old-Rules offset.  If we
> have a lot of messages in that topic, that could take a long time.  This is
> why I was hoping there would be a way to bootstrap the new rules while
> we're still processing the old rules.  Do you think there is a way to do
> that?
>
> On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <yanfang...@gmail.com> wrote:
>
> > Hi Jeremy,
> >
> > If my understanding is correct, whenever you add a new rule, you want to
> > apply this rule to the historical data. Right?
> >
> > If you do not care about duplication, you can create a new task that
> > contains existing rules and new rules. Configure bootstrap. This will
> apply
> > all the rules from the beginning of the input stream. The shortcoming is
> > you will get duplicated results for old rules.
> >
> > If you can not tolerate the shortcoming, 1) get the offset of the
> > latest-processed message of old rules. 2) In your new task, ignore
> messages
> > before that offset for the old rules. 3) bootstrap.
> >
> > Hope this helps. Maybe your use case is more complicated?
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> athomewithagroove...@gmail.com
> > >
> > wrote:
> >
> > > So, I'm wanting to use Samza for a project I'm working on, but I keep
> > > running into a problem with bootstrapping.
> > >
> > > Let's say there's a Kafka topic called Numbers that I want to consume
> > with
> > > Samza.  Let's say each message has a single integer in it, and I want
> to
> > > classify it as even or odd.  So I have two topics that I'm using for
> > > output, one called Even and one called Odd.  I write a simple stream
> task
> > > called Classifier that consumes the Numbers topic, examines each
> incoming
> > > integer and writes it back out to Even or Odd.
> > >
> > > Now, let's say I want to be able to add classifications dynamically,
> > like :
> > > "divisible by three", "divisible by four", or "numbers that appear in
> my
> > > date of birth".  And let's say I have an API I can query that gives me
> > all
> > > the assignment rules, such as "when a number is divisble by 3, write it
> > out
> > > to a topic called 'divisible_by_three'", or "when a number appears in
> the
> > > string 12/12/1981, write it to the 'my_birthday' topic".  So now I
> > rewrite
> > > my stream task to query this API for assignment rules.  It reads
> integers
> > > from the Numbers topic and writes them back out to one or more output
> > > topics, according to the assignment rules.
> > >
> > > Now, let's make this even more complicated.  When I add a new
> > > classification, I want to go back to the very beginning of the Numbers
> > > topic and classify them accordingly.  Once we've consumed all the old
> > > "historical" integers, I want to apply this classification new integers
> > as
> > > they come in.
> > >
> > > And this is where I get stuck.
> > >
> > > One thing I can do : when I want to add a new classification, I can
> > create
> > > a bootstrap job by setting the
> > > "systems.kafka.streams.numbers.samza.offset.default" property to
> > "oldest".
> > > And that's great, but the problem is, once I've "caught up", I'd like
> to
> > > kill the bootstrap job and just let the Classifier handle this new
> > > assignment.  So, I'd want to do some kind of handover from the
> bootstrap
> > > job to the Classifier job.  But how to do this?
> > >
> > > So, the question I must ask is this : Is Samza even an appopriate way
> to
> > > solve this problem?  Has this problem ever come up for anybody else?
> How
> > > have they solved it?  I would really like to use Samza because it seems
> > > like an appopriate technology, and I'd really really really really like
> > to
> > > avoid re-inventing the wheel.
> > >
> > > A couple solutions I came up with :
> > >
> > > 1) The simple solution.  Have a separate Samza job for each
> > > classification.  If I want to add a new classification, I create a new
> > job
> > > and set it up as a bootstrap job.  This would solve the problem.
> > However,
> > > we may want to have many, many classifications.  It could be as many as
> > > 1,000,000, which would mean up to 1,000,000 simultaneously running
> jobs.
> > > This could create a lot of overhead for YARN and Kafka.
> > >
> > > 2) My overly-complicated workaround solution.  Each assignment rule has
> > an
> > > "isnew" flag.  If it's a new classification that hasn't fully
> > bootstrapped
> > > yet, the "isnew" flag is set to TRUE.  When my classifier queries the
> API
> > > for assignment rules, it ignores any rule with an "isnew" flag.  When I
> > > want to add a new classification, I create a new bootstrap job for that
> > > classification.  Every so often, maybe every few days or so, if all of
> my
> > > bootstrap jobs have "caught up", I kill all of the bootstrap jobs and
> > > classifier jobs.  I set all the "isnew" flags to FALSE.  Then I restart
> > the
> > > classifier job.  This is kind of an ugly solution, and I'm not even
> sure
> > it
> > > would work.  For one thing, I'd need some way of knowing if a boostrap
> > job
> > > has "caught up".  Secondly, I'd essentially be restarting the
> classifier
> > > job periodically, which just seems like an ugly solution.  I don't like
> > it.
> > >
> > > 3) Some other kind of really complicated solution I haven't thought of
> > yet,
> > > probably involving locks, transactions, concurrancy, and interprocess
> > > communication.
> > >
> > > Thanks for reading this whole thing.  Please let me know if you have
> any
> > > suggestions.
> > >
> >
>

Reply via email to