What about this:

1) Add new rule to the classifier task
2) Take note of offset of the first message processed after restart
3) Run a job to process from offset 0 to the offset from #2, after which
the job is deleted

I don't know how to do 2 or 3, but perhaps some of the core Samza folk
could shed some light?


b



On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <athomewithagroove...@gmail.com>
wrote:

> Hello Yan,
>
> Thank you for the suggestion!  I think your solution would work, however, I
> am afraid it would create a performance problem for our users.
>
> Let's say we kill the Classifier task, and create a new Classifier task
> with both the existing rules and new rules. We get the offset of the
> latest-processed message for the old rules.  Let's call this offset
> Last-Processed-Old-Rules.  We ignore messages
> before Last-Processed-Old-Rules for the old rules.  We configure the new
> Classifier task to be a bootstrap task.
>
> Let's say we have users who are watching the output topics, and they are
> expecting near-realtime updates.  They won't see any updates for the old
> rules until our task has passed the Last-Processed-Old-Rules offset.  If we
> have a lot of messages in that topic, that could take a long time.  This is
> why I was hoping there would be a way to bootstrap the new rules while
> we're still processing the old rules.  Do you think there is a way to do
> that?
>
> On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <yanfang...@gmail.com> wrote:
>
> > Hi Jeremy,
> >
> > If my understanding is correct, whenever you add a new rule, you want to
> > apply this rule to the historical data. Right?
> >
> > If you do not care about duplication, you can create a new task that
> > contains existing rules and new rules. Configure bootstrap. This will
> apply
> > all the rules from the beginning of the input stream. The shortcoming is
> > you will get duplicated results for old rules.
> >
> > If you can not tolerate the shortcoming, 1) get the offset of the
> > latest-processed message of old rules. 2) In your new task, ignore
> messages
> > before that offset for the old rules. 3) bootstrap.
> >
> > Hope this helps. Maybe your use case is more complicated?
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> athomewithagroove...@gmail.com
> > >
> > wrote:
> >
> > > So, I'm wanting to use Samza for a project I'm working on, but I keep
> > > running into a problem with bootstrapping.
> > >
> > > Let's say there's a Kafka topic called Numbers that I want to consume
> > with
> > > Samza.  Let's say each message has a single integer in it, and I want
> to
> > > classify it as even or odd.  So I have two topics that I'm using for
> > > output, one called Even and one called Odd.  I write a simple stream
> task
> > > called Classifier that consumes the Numbers topic, examines each
> incoming
> > > integer and writes it back out to Even or Odd.
> > >
> > > Now, let's say I want to be able to add classifications dynamically,
> > like :
> > > "divisible by three", "divisible by four", or "numbers that appear in
> my
> > > date of birth".  And let's say I have an API I can query that gives me
> > all
> > > the assignment rules, such as "when a number is divisble by 3, write it
> > out
> > > to a topic called 'divisible_by_three'", or "when a number appears in
> the
> > > string 12/12/1981, write it to the 'my_birthday' topic".  So now I
> > rewrite
> > > my stream task to query this API for assignment rules.  It reads
> integers
> > > from the Numbers topic and writes them back out to one or more output
> > > topics, according to the assignment rules.
> > >
> > > Now, let's make this even more complicated.  When I add a new
> > > classification, I want to go back to the very beginning of the Numbers
> > > topic and classify them accordingly.  Once we've consumed all the old
> > > "historical" integers, I want to apply this classification new integers
> > as
> > > they come in.
> > >
> > > And this is where I get stuck.
> > >
> > > One thing I can do : when I want to add a new classification, I can
> > create
> > > a bootstrap job by setting the
> > > "systems.kafka.streams.numbers.samza.offset.default" property to
> > "oldest".
> > > And that's great, but the problem is, once I've "caught up", I'd like
> to
> > > kill the bootstrap job and just let the Classifier handle this new
> > > assignment.  So, I'd want to do some kind of handover from the
> bootstrap
> > > job to the Classifier job.  But how to do this?
> > >
> > > So, the question I must ask is this : Is Samza even an appopriate way
> to
> > > solve this problem?  Has this problem ever come up for anybody else?
> How
> > > have they solved it?  I would really like to use Samza because it seems
> > > like an appopriate technology, and I'd really really really really like
> > to
> > > avoid re-inventing the wheel.
> > >
> > > A couple solutions I came up with :
> > >
> > > 1) The simple solution.  Have a separate Samza job for each
> > > classification.  If I want to add a new classification, I create a new
> > job
> > > and set it up as a bootstrap job.  This would solve the problem.
> > However,
> > > we may want to have many, many classifications.  It could be as many as
> > > 1,000,000, which would mean up to 1,000,000 simultaneously running
> jobs.
> > > This could create a lot of overhead for YARN and Kafka.
> > >
> > > 2) My overly-complicated workaround solution.  Each assignment rule has
> > an
> > > "isnew" flag.  If it's a new classification that hasn't fully
> > bootstrapped
> > > yet, the "isnew" flag is set to TRUE.  When my classifier queries the
> API
> > > for assignment rules, it ignores any rule with an "isnew" flag.  When I
> > > want to add a new classification, I create a new bootstrap job for that
> > > classification.  Every so often, maybe every few days or so, if all of
> my
> > > bootstrap jobs have "caught up", I kill all of the bootstrap jobs and
> > > classifier jobs.  I set all the "isnew" flags to FALSE.  Then I restart
> > the
> > > classifier job.  This is kind of an ugly solution, and I'm not even
> sure
> > it
> > > would work.  For one thing, I'd need some way of knowing if a boostrap
> > job
> > > has "caught up".  Secondly, I'd essentially be restarting the
> classifier
> > > job periodically, which just seems like an ugly solution.  I don't like
> > it.
> > >
> > > 3) Some other kind of really complicated solution I haven't thought of
> > yet,
> > > probably involving locks, transactions, concurrancy, and interprocess
> > > communication.
> > >
> > > Thanks for reading this whole thing.  Please let me know if you have
> any
> > > suggestions.
> > >
> >
>

Reply via email to