Hi, Jeremy,

I saw the following requirements from your use case:

1) New rules need to be dynamically added w/ creating too many Samza jobs
(e.g. 1 Samza job per new rule is too much)
2) Old rules need to continue processing when new rules are added

I want to ask a few more questions regarding to your requirements:

Q.1) Is it required that for a new rule, the bootstrap processing of
messages from offset 0 to Last-Processed-Old-Rules has to be done before
the new rules can be applied to messages from offset
Last-Processed-Old-Rules?
Q.2) Is it required that after bootstrap, all rules are processing the
message at the same offset w/ the same speed?

If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we will
have to slow down or stop the jobs for the old rules s.t. the jobs running
both new and old rules can catch up, as Yan pointed out. If answers to both
questions above are no (which I doubt since you need to build-up certain
"history" for the new rule before you can apply it to later messages), you
can take Ben/Guozhang's approach w/o coordination between the two jobs.

Now the interesting case is that your answer to Q.1 is yes, and to Q.2 is
no, which essentially post a request that your job will need to keep
multiple independent consumer offsets per rule and let them move w/ their
own speed. Or, at least one bootstrap consumer, and one normal processing
consumer on the same system stream partition within a single job. I don't
think that Samza support this now. And the only work around is Yan's
solution which requires coordination between two jobs.

-Yi

On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang <yanfang...@gmail.com> wrote:

> you are able to call coordinator.shutdown to shut the job down after it
> reaches the offset.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I feel Ben's solution a bit simpler that you just need to restart your
> > current job with both rules on the check pointed offset, and start a new
> > job from offset 0 with only the new rule and it will stop at the checkout
> > pointed offset. But of course it requires the second job to be able to
> > shutdown itself upon some specific offset which I am not sure if it is
> > already supported.
> >
> > Guozhang
> >
> > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang <yanfang...@gmail.com> wrote:
> >
> > > Hi Jeremy,
> > >
> > > In order to reach this goal, we have to assume that the job with new
> > rules
> > > can always catch up with the one with old rules. Otherwise, I think we
> do
> > > not have the choice but running a lot of jobs simultaneously.
> > >
> > > Under our assumption, we have job1 with old rules running, and now add
> > job2
> > > which integrates old rules and new rules to run. Job2 frequently
> > > checks the Last-Processed-Old-Rules
> > > offset from job1 (because job1 is running too), and it only applies new
> > > rule to the data until catch up with the Last-Processed-Old-Rules
> offset.
> > > Then it sends signal to the job1 and shutdown job1, and applies all
> rules
> > > to the stream.
> > >
> > > In terms of how to shutdown the job1, here is one solution
> > > <
> > >
> >
> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E
> > > >
> > > provided by Chris - e.g. you can have a control stream to get job1
> > > shutdown. Samza will provide this kind of stream after SAMZA-348
> > > <https://issues.apache.org/jira/browse/SAMZA-348>, which is under
> active
> > > development.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <
> > athomewithagroove...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hello Yan,
> > > >
> > > > Thank you for the suggestion!  I think your solution would work,
> > > however, I
> > > > am afraid it would create a performance problem for our users.
> > > >
> > > > Let's say we kill the Classifier task, and create a new Classifier
> task
> > > > with both the existing rules and new rules. We get the offset of the
> > > > latest-processed message for the old rules.  Let's call this offset
> > > > Last-Processed-Old-Rules.  We ignore messages
> > > > before Last-Processed-Old-Rules for the old rules.  We configure the
> > new
> > > > Classifier task to be a bootstrap task.
> > > >
> > > > Let's say we have users who are watching the output topics, and they
> > are
> > > > expecting near-realtime updates.  They won't see any updates for the
> > old
> > > > rules until our task has passed the Last-Processed-Old-Rules offset.
> > If
> > > we
> > > > have a lot of messages in that topic, that could take a long time.
> > This
> > > is
> > > > why I was hoping there would be a way to bootstrap the new rules
> while
> > > > we're still processing the old rules.  Do you think there is a way to
> > do
> > > > that?
> > > >
> > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <yanfang...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Jeremy,
> > > > >
> > > > > If my understanding is correct, whenever you add a new rule, you
> want
> > > to
> > > > > apply this rule to the historical data. Right?
> > > > >
> > > > > If you do not care about duplication, you can create a new task
> that
> > > > > contains existing rules and new rules. Configure bootstrap. This
> will
> > > > apply
> > > > > all the rules from the beginning of the input stream. The
> shortcoming
> > > is
> > > > > you will get duplicated results for old rules.
> > > > >
> > > > > If you can not tolerate the shortcoming, 1) get the offset of the
> > > > > latest-processed message of old rules. 2) In your new task, ignore
> > > > messages
> > > > > before that offset for the old rules. 3) bootstrap.
> > > > >
> > > > > Hope this helps. Maybe your use case is more complicated?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Fang, Yan
> > > > > yanfang...@gmail.com
> > > > >
> > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> > > > athomewithagroove...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > So, I'm wanting to use Samza for a project I'm working on, but I
> > keep
> > > > > > running into a problem with bootstrapping.
> > > > > >
> > > > > > Let's say there's a Kafka topic called Numbers that I want to
> > consume
> > > > > with
> > > > > > Samza.  Let's say each message has a single integer in it, and I
> > want
> > > > to
> > > > > > classify it as even or odd.  So I have two topics that I'm using
> > for
> > > > > > output, one called Even and one called Odd.  I write a simple
> > stream
> > > > task
> > > > > > called Classifier that consumes the Numbers topic, examines each
> > > > incoming
> > > > > > integer and writes it back out to Even or Odd.
> > > > > >
> > > > > > Now, let's say I want to be able to add classifications
> > dynamically,
> > > > > like :
> > > > > > "divisible by three", "divisible by four", or "numbers that
> appear
> > in
> > > > my
> > > > > > date of birth".  And let's say I have an API I can query that
> gives
> > > me
> > > > > all
> > > > > > the assignment rules, such as "when a number is divisble by 3,
> > write
> > > it
> > > > > out
> > > > > > to a topic called 'divisible_by_three'", or "when a number
> appears
> > in
> > > > the
> > > > > > string 12/12/1981, write it to the 'my_birthday' topic".  So now
> I
> > > > > rewrite
> > > > > > my stream task to query this API for assignment rules.  It reads
> > > > integers
> > > > > > from the Numbers topic and writes them back out to one or more
> > output
> > > > > > topics, according to the assignment rules.
> > > > > >
> > > > > > Now, let's make this even more complicated.  When I add a new
> > > > > > classification, I want to go back to the very beginning of the
> > > Numbers
> > > > > > topic and classify them accordingly.  Once we've consumed all the
> > old
> > > > > > "historical" integers, I want to apply this classification new
> > > integers
> > > > > as
> > > > > > they come in.
> > > > > >
> > > > > > And this is where I get stuck.
> > > > > >
> > > > > > One thing I can do : when I want to add a new classification, I
> can
> > > > > create
> > > > > > a bootstrap job by setting the
> > > > > > "systems.kafka.streams.numbers.samza.offset.default" property to
> > > > > "oldest".
> > > > > > And that's great, but the problem is, once I've "caught up", I'd
> > like
> > > > to
> > > > > > kill the bootstrap job and just let the Classifier handle this
> new
> > > > > > assignment.  So, I'd want to do some kind of handover from the
> > > > bootstrap
> > > > > > job to the Classifier job.  But how to do this?
> > > > > >
> > > > > > So, the question I must ask is this : Is Samza even an appopriate
> > way
> > > > to
> > > > > > solve this problem?  Has this problem ever come up for anybody
> > else?
> > > > How
> > > > > > have they solved it?  I would really like to use Samza because it
> > > seems
> > > > > > like an appopriate technology, and I'd really really really
> really
> > > like
> > > > > to
> > > > > > avoid re-inventing the wheel.
> > > > > >
> > > > > > A couple solutions I came up with :
> > > > > >
> > > > > > 1) The simple solution.  Have a separate Samza job for each
> > > > > > classification.  If I want to add a new classification, I create
> a
> > > new
> > > > > job
> > > > > > and set it up as a bootstrap job.  This would solve the problem.
> > > > > However,
> > > > > > we may want to have many, many classifications.  It could be as
> > many
> > > as
> > > > > > 1,000,000, which would mean up to 1,000,000 simultaneously
> running
> > > > jobs.
> > > > > > This could create a lot of overhead for YARN and Kafka.
> > > > > >
> > > > > > 2) My overly-complicated workaround solution.  Each assignment
> rule
> > > has
> > > > > an
> > > > > > "isnew" flag.  If it's a new classification that hasn't fully
> > > > > bootstrapped
> > > > > > yet, the "isnew" flag is set to TRUE.  When my classifier queries
> > the
> > > > API
> > > > > > for assignment rules, it ignores any rule with an "isnew" flag.
> > > When I
> > > > > > want to add a new classification, I create a new bootstrap job
> for
> > > that
> > > > > > classification.  Every so often, maybe every few days or so, if
> all
> > > of
> > > > my
> > > > > > bootstrap jobs have "caught up", I kill all of the bootstrap jobs
> > and
> > > > > > classifier jobs.  I set all the "isnew" flags to FALSE.  Then I
> > > restart
> > > > > the
> > > > > > classifier job.  This is kind of an ugly solution, and I'm not
> even
> > > > sure
> > > > > it
> > > > > > would work.  For one thing, I'd need some way of knowing if a
> > > boostrap
> > > > > job
> > > > > > has "caught up".  Secondly, I'd essentially be restarting the
> > > > classifier
> > > > > > job periodically, which just seems like an ugly solution.  I
> don't
> > > like
> > > > > it.
> > > > > >
> > > > > > 3) Some other kind of really complicated solution I haven't
> thought
> > > of
> > > > > yet,
> > > > > > probably involving locks, transactions, concurrancy, and
> > interprocess
> > > > > > communication.
> > > > > >
> > > > > > Thanks for reading this whole thing.  Please let me know if you
> > have
> > > > any
> > > > > > suggestions.
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to