If you need to maintain ordering of a sequence of messages, those messages
should all be written to the same partition. If you are concerned with
global ordering of all messages in a topic then kafka is likely not going
to be what you want. Ordering guarantees are strictly per partition. samza
is built on this principle by having a tasks work from a single partition.
If your jobs require global coordination between tasks, again, you might
reconsider either your architecture or your use of kafka.

Not trying to harsh your mellow here. High scale systems like kafka require
you match your architecture to them. To do otherwise produces bad times.

On Thu, Apr 16, 2015 at 1:51 PM, jeremy p <athomewithagroove...@gmail.com>
wrote:

> Thank you for the response.  Does this mean the Old-Rules-Job would need to
> maintain a Last-Processed-Old-Rules offset for each partition?
>
> On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black <b...@b3k.us> wrote:
>
> > Offsets are per partition. The alternative would have poor scaling
> behavior
> > for both brokers and consumers.
> >
> > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
> athomewithagroove...@gmail.com>
> > wrote:
> >
> > > Thanks to everybody for the responses!
> > >
> > > Yi : The queue must be processed in order, which means that I cannot
> use
> > > Ben and Guozhang's approach.
> > >
> > > However, it is not necessary that all rules be processed at the same
> > offset
> > > and at the same speed.  This is why I considered a solution where we
> had
> > a
> > > separate job for each rule.  The problem with that solution is that we
> > > could have thousands of these rules, which would mean thousands of
> jobs.
> > > These jobs would be really lightweight and would require very few
> system
> > > resources.  However, I don't know if having thousands of jobs would
> break
> > > YARN.
> > >
> > > For now, it sounds like Yan's solution would be the best. However, I
> > have a
> > > few questions about it.  For now, let's call the original job the
> > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
> > > solution, as I understand it :
> > >
> > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We
> start
> > > the All-Rules-Job.  The All-Rules-Job will only apply new rules until
> it
> > > gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job
> gets
> > > to the Last-Processed-Old-Rules offset, it sends a kill signal to
> > > Old-Rules-Job along a control stream.  Old-Rules-Job terminates itself.
> > > Then the All-Rules-Job applies both old and new rules to every message
> > that
> > > comes in.
> > >
> > > My questions :
> > >
> > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every
> > > time it processes a message?  How does the Old-Rules-Job expose the
> > > Last-Processed-Rules offset to the All-Rules-Job?  Would the
> > > Last-Processed-Rules offset be the absolute offset within a topic, and
> > not
> > > the offset within a partition?  Is there a way to find out a message's
> > > absolute offset within a topic?
> > >
> > > Thanks again for all the help!
> > >
> > > --Jeremy
> > >
> > >
> > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan <nickpa...@gmail.com> wrote:
> > >
> > > > Hi, Jeremy,
> > > >
> > > > I saw the following requirements from your use case:
> > > >
> > > > 1) New rules need to be dynamically added w/ creating too many Samza
> > jobs
> > > > (e.g. 1 Samza job per new rule is too much)
> > > > 2) Old rules need to continue processing when new rules are added
> > > >
> > > > I want to ask a few more questions regarding to your requirements:
> > > >
> > > > Q.1) Is it required that for a new rule, the bootstrap processing of
> > > > messages from offset 0 to Last-Processed-Old-Rules has to be done
> > before
> > > > the new rules can be applied to messages from offset
> > > > Last-Processed-Old-Rules?
> > > > Q.2) Is it required that after bootstrap, all rules are processing
> the
> > > > message at the same offset w/ the same speed?
> > > >
> > > > If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we
> > > will
> > > > have to slow down or stop the jobs for the old rules s.t. the jobs
> > > running
> > > > both new and old rules can catch up, as Yan pointed out. If answers
> to
> > > both
> > > > questions above are no (which I doubt since you need to build-up
> > certain
> > > > "history" for the new rule before you can apply it to later
> messages),
> > > you
> > > > can take Ben/Guozhang's approach w/o coordination between the two
> jobs.
> > > >
> > > > Now the interesting case is that your answer to Q.1 is yes, and to
> Q.2
> > is
> > > > no, which essentially post a request that your job will need to keep
> > > > multiple independent consumer offsets per rule and let them move w/
> > their
> > > > own speed. Or, at least one bootstrap consumer, and one normal
> > processing
> > > > consumer on the same system stream partition within a single job. I
> > don't
> > > > think that Samza support this now. And the only work around is Yan's
> > > > solution which requires coordination between two jobs.
> > > >
> > > > -Yi
> > > >
> > > > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang <yanfang...@gmail.com>
> > wrote:
> > > >
> > > > > you are able to call coordinator.shutdown to shut the job down
> after
> > it
> > > > > reaches the offset.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Fang, Yan
> > > > > yanfang...@gmail.com
> > > > >
> > > > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > I feel Ben's solution a bit simpler that you just need to restart
> > > your
> > > > > > current job with both rules on the check pointed offset, and
> start
> > a
> > > > new
> > > > > > job from offset 0 with only the new rule and it will stop at the
> > > > checkout
> > > > > > pointed offset. But of course it requires the second job to be
> able
> > > to
> > > > > > shutdown itself upon some specific offset which I am not sure if
> it
> > > is
> > > > > > already supported.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang <yanfang...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi Jeremy,
> > > > > > >
> > > > > > > In order to reach this goal, we have to assume that the job
> with
> > > new
> > > > > > rules
> > > > > > > can always catch up with the one with old rules. Otherwise, I
> > think
> > > > we
> > > > > do
> > > > > > > not have the choice but running a lot of jobs simultaneously.
> > > > > > >
> > > > > > > Under our assumption, we have job1 with old rules running, and
> > now
> > > > add
> > > > > > job2
> > > > > > > which integrates old rules and new rules to run. Job2
> frequently
> > > > > > > checks the Last-Processed-Old-Rules
> > > > > > > offset from job1 (because job1 is running too), and it only
> > applies
> > > > new
> > > > > > > rule to the data until catch up with the
> Last-Processed-Old-Rules
> > > > > offset.
> > > > > > > Then it sends signal to the job1 and shutdown job1, and applies
> > all
> > > > > rules
> > > > > > > to the stream.
> > > > > > >
> > > > > > > In terms of how to shutdown the job1, here is one solution
> > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E
> > > > > > > >
> > > > > > > provided by Chris - e.g. you can have a control stream to get
> > job1
> > > > > > > shutdown. Samza will provide this kind of stream after
> SAMZA-348
> > > > > > > <https://issues.apache.org/jira/browse/SAMZA-348>, which is
> > under
> > > > > active
> > > > > > > development.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Fang, Yan
> > > > > > > yanfang...@gmail.com
> > > > > > >
> > > > > > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <
> > > > > > athomewithagroove...@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Yan,
> > > > > > > >
> > > > > > > > Thank you for the suggestion!  I think your solution would
> > work,
> > > > > > > however, I
> > > > > > > > am afraid it would create a performance problem for our
> users.
> > > > > > > >
> > > > > > > > Let's say we kill the Classifier task, and create a new
> > > Classifier
> > > > > task
> > > > > > > > with both the existing rules and new rules. We get the offset
> > of
> > > > the
> > > > > > > > latest-processed message for the old rules.  Let's call this
> > > offset
> > > > > > > > Last-Processed-Old-Rules.  We ignore messages
> > > > > > > > before Last-Processed-Old-Rules for the old rules.  We
> > configure
> > > > the
> > > > > > new
> > > > > > > > Classifier task to be a bootstrap task.
> > > > > > > >
> > > > > > > > Let's say we have users who are watching the output topics,
> and
> > > > they
> > > > > > are
> > > > > > > > expecting near-realtime updates.  They won't see any updates
> > for
> > > > the
> > > > > > old
> > > > > > > > rules until our task has passed the Last-Processed-Old-Rules
> > > > offset.
> > > > > > If
> > > > > > > we
> > > > > > > > have a lot of messages in that topic, that could take a long
> > > time.
> > > > > > This
> > > > > > > is
> > > > > > > > why I was hoping there would be a way to bootstrap the new
> > rules
> > > > > while
> > > > > > > > we're still processing the old rules.  Do you think there is
> a
> > > way
> > > > to
> > > > > > do
> > > > > > > > that?
> > > > > > > >
> > > > > > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang <
> > yanfang...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jeremy,
> > > > > > > > >
> > > > > > > > > If my understanding is correct, whenever you add a new
> rule,
> > > you
> > > > > want
> > > > > > > to
> > > > > > > > > apply this rule to the historical data. Right?
> > > > > > > > >
> > > > > > > > > If you do not care about duplication, you can create a new
> > task
> > > > > that
> > > > > > > > > contains existing rules and new rules. Configure bootstrap.
> > > This
> > > > > will
> > > > > > > > apply
> > > > > > > > > all the rules from the beginning of the input stream. The
> > > > > shortcoming
> > > > > > > is
> > > > > > > > > you will get duplicated results for old rules.
> > > > > > > > >
> > > > > > > > > If you can not tolerate the shortcoming, 1) get the offset
> of
> > > the
> > > > > > > > > latest-processed message of old rules. 2) In your new task,
> > > > ignore
> > > > > > > > messages
> > > > > > > > > before that offset for the old rules. 3) bootstrap.
> > > > > > > > >
> > > > > > > > > Hope this helps. Maybe your use case is more complicated?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Fang, Yan
> > > > > > > > > yanfang...@gmail.com
> > > > > > > > >
> > > > > > > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> > > > > > > > athomewithagroove...@gmail.com
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > So, I'm wanting to use Samza for a project I'm working
> on,
> > > but
> > > > I
> > > > > > keep
> > > > > > > > > > running into a problem with bootstrapping.
> > > > > > > > > >
> > > > > > > > > > Let's say there's a Kafka topic called Numbers that I
> want
> > to
> > > > > > consume
> > > > > > > > > with
> > > > > > > > > > Samza.  Let's say each message has a single integer in
> it,
> > > and
> > > > I
> > > > > > want
> > > > > > > > to
> > > > > > > > > > classify it as even or odd.  So I have two topics that
> I'm
> > > > using
> > > > > > for
> > > > > > > > > > output, one called Even and one called Odd.  I write a
> > simple
> > > > > > stream
> > > > > > > > task
> > > > > > > > > > called Classifier that consumes the Numbers topic,
> examines
> > > > each
> > > > > > > > incoming
> > > > > > > > > > integer and writes it back out to Even or Odd.
> > > > > > > > > >
> > > > > > > > > > Now, let's say I want to be able to add classifications
> > > > > > dynamically,
> > > > > > > > > like :
> > > > > > > > > > "divisible by three", "divisible by four", or "numbers
> that
> > > > > appear
> > > > > > in
> > > > > > > > my
> > > > > > > > > > date of birth".  And let's say I have an API I can query
> > that
> > > > > gives
> > > > > > > me
> > > > > > > > > all
> > > > > > > > > > the assignment rules, such as "when a number is divisble
> by
> > > 3,
> > > > > > write
> > > > > > > it
> > > > > > > > > out
> > > > > > > > > > to a topic called 'divisible_by_three'", or "when a
> number
> > > > > appears
> > > > > > in
> > > > > > > > the
> > > > > > > > > > string 12/12/1981, write it to the 'my_birthday' topic".
> > So
> > > > now
> > > > > I
> > > > > > > > > rewrite
> > > > > > > > > > my stream task to query this API for assignment rules.
> It
> > > > reads
> > > > > > > > integers
> > > > > > > > > > from the Numbers topic and writes them back out to one or
> > > more
> > > > > > output
> > > > > > > > > > topics, according to the assignment rules.
> > > > > > > > > >
> > > > > > > > > > Now, let's make this even more complicated.  When I add a
> > new
> > > > > > > > > > classification, I want to go back to the very beginning
> of
> > > the
> > > > > > > Numbers
> > > > > > > > > > topic and classify them accordingly.  Once we've consumed
> > all
> > > > the
> > > > > > old
> > > > > > > > > > "historical" integers, I want to apply this
> classification
> > > new
> > > > > > > integers
> > > > > > > > > as
> > > > > > > > > > they come in.
> > > > > > > > > >
> > > > > > > > > > And this is where I get stuck.
> > > > > > > > > >
> > > > > > > > > > One thing I can do : when I want to add a new
> > > classification, I
> > > > > can
> > > > > > > > > create
> > > > > > > > > > a bootstrap job by setting the
> > > > > > > > > > "systems.kafka.streams.numbers.samza.offset.default"
> > property
> > > > to
> > > > > > > > > "oldest".
> > > > > > > > > > And that's great, but the problem is, once I've "caught
> > up",
> > > > I'd
> > > > > > like
> > > > > > > > to
> > > > > > > > > > kill the bootstrap job and just let the Classifier handle
> > > this
> > > > > new
> > > > > > > > > > assignment.  So, I'd want to do some kind of handover
> from
> > > the
> > > > > > > > bootstrap
> > > > > > > > > > job to the Classifier job.  But how to do this?
> > > > > > > > > >
> > > > > > > > > > So, the question I must ask is this : Is Samza even an
> > > > appopriate
> > > > > > way
> > > > > > > > to
> > > > > > > > > > solve this problem?  Has this problem ever come up for
> > > anybody
> > > > > > else?
> > > > > > > > How
> > > > > > > > > > have they solved it?  I would really like to use Samza
> > > because
> > > > it
> > > > > > > seems
> > > > > > > > > > like an appopriate technology, and I'd really really
> really
> > > > > really
> > > > > > > like
> > > > > > > > > to
> > > > > > > > > > avoid re-inventing the wheel.
> > > > > > > > > >
> > > > > > > > > > A couple solutions I came up with :
> > > > > > > > > >
> > > > > > > > > > 1) The simple solution.  Have a separate Samza job for
> each
> > > > > > > > > > classification.  If I want to add a new classification, I
> > > > create
> > > > > a
> > > > > > > new
> > > > > > > > > job
> > > > > > > > > > and set it up as a bootstrap job.  This would solve the
> > > > problem.
> > > > > > > > > However,
> > > > > > > > > > we may want to have many, many classifications.  It could
> > be
> > > as
> > > > > > many
> > > > > > > as
> > > > > > > > > > 1,000,000, which would mean up to 1,000,000
> simultaneously
> > > > > running
> > > > > > > > jobs.
> > > > > > > > > > This could create a lot of overhead for YARN and Kafka.
> > > > > > > > > >
> > > > > > > > > > 2) My overly-complicated workaround solution.  Each
> > > assignment
> > > > > rule
> > > > > > > has
> > > > > > > > > an
> > > > > > > > > > "isnew" flag.  If it's a new classification that hasn't
> > fully
> > > > > > > > > bootstrapped
> > > > > > > > > > yet, the "isnew" flag is set to TRUE.  When my classifier
> > > > queries
> > > > > > the
> > > > > > > > API
> > > > > > > > > > for assignment rules, it ignores any rule with an "isnew"
> > > flag.
> > > > > > > When I
> > > > > > > > > > want to add a new classification, I create a new
> bootstrap
> > > job
> > > > > for
> > > > > > > that
> > > > > > > > > > classification.  Every so often, maybe every few days or
> > so,
> > > if
> > > > > all
> > > > > > > of
> > > > > > > > my
> > > > > > > > > > bootstrap jobs have "caught up", I kill all of the
> > bootstrap
> > > > jobs
> > > > > > and
> > > > > > > > > > classifier jobs.  I set all the "isnew" flags to FALSE.
> > > Then I
> > > > > > > restart
> > > > > > > > > the
> > > > > > > > > > classifier job.  This is kind of an ugly solution, and
> I'm
> > > not
> > > > > even
> > > > > > > > sure
> > > > > > > > > it
> > > > > > > > > > would work.  For one thing, I'd need some way of knowing
> > if a
> > > > > > > boostrap
> > > > > > > > > job
> > > > > > > > > > has "caught up".  Secondly, I'd essentially be restarting
> > the
> > > > > > > > classifier
> > > > > > > > > > job periodically, which just seems like an ugly solution.
> > I
> > > > > don't
> > > > > > > like
> > > > > > > > > it.
> > > > > > > > > >
> > > > > > > > > > 3) Some other kind of really complicated solution I
> haven't
> > > > > thought
> > > > > > > of
> > > > > > > > > yet,
> > > > > > > > > > probably involving locks, transactions, concurrancy, and
> > > > > > interprocess
> > > > > > > > > > communication.
> > > > > > > > > >
> > > > > > > > > > Thanks for reading this whole thing.  Please let me know
> if
> > > you
> > > > > > have
> > > > > > > > any
> > > > > > > > > > suggestions.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to