Re: How to deal with bootstrapping

2015-04-17 Thread jeremy p
I think that solution will work!  Thank you, Yan.

And thanks to everybody in this thread!  You all were very helpful.  I look
forward to using Samza and being a part of the community.

--Jeremy

On Fri, Apr 17, 2015 at 2:06 AM, Yan Fang  wrote:

> Hi Jeremy,
>
> Benjamin is right, New-Rules-Job will need to know the map of partitions to
> offsets. Samza's checkpoint stream has the mapping. The doc is here
> <
> http://samza.apache.org/learn/documentation/0.9/container/checkpointing.html
> >
>  .
>
> However, after my second thought, I do not recommend to use the default
> checkpoint stream because 1) it was initially designed for restarting job.
> not very friendly for other usage -- a lot of mapping and config stuff
> involved. You can check CheckpointTool
> <
> https://github.com/apache/samza/blob/0.9.0/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
> >
> to
> get a feel how to read that stream 2) This is an important point. your code
> will be incompatible with newer Samza -- after SAMZA-465
>  , there will be a
> coordinator-stream, no checkpoint stream.
>
> Even though you may not use the system's checkpoint stream, you can easily
> create and send the latest offset to simple-checkpoint stream. The mapping
> problem raised by Benjamin is still solvable. For example, your input
> stream Number has 10 partitions, you can write to a
> 10-partition simple-checkpoint stream. Simple-checkpoint stream's partition
> number is always the same as that of Number's. So when you process
> partition #1 of Number, you will write to partition #1 of simple-checkpoint
> stream. When you bring up the New-Rules-Job, it accepts two streams: Number
> and simple-checkpoint stream. The latter has the latest offset of the
> Old-Rules-Job. Just need to guarantee the same partition # of Number and
> simple-checkpoint stream goes to the same container. By default, it does.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Thu, Apr 16, 2015 at 2:16 PM, Benjamin Black  wrote:
>
> > New-Rules-Job will need to know the complete map of partitions to
> offsets.
> >
> > On Thu, Apr 16, 2015 at 2:06 PM, jeremy p <
> athomewithagroove...@gmail.com>
> > wrote:
> >
> > > Ben : I think we are talking about different things here.  I'm not
> trying
> > > to maintain ordering across a topic.  I know that is not what Kafka and
> > > Samza are meant for.  What I'm trying to do here is give my
> > Old-Rules-Job a
> > > way of telling New-Rules-Job, "Once you hit this offset, start applying
> > > both old and new rules."  So is that a single absolute offset that I
> want
> > > to pass from Old-Rules-Job to New-Rules-Job?  Or a set of offsets, one
> > for
> > > each partition.
> > >
> > > On Thu, Apr 16, 2015 at 4:58 PM, Benjamin Black  wrote:
> > >
> > > > If you need to maintain ordering of a sequence of messages, those
> > > messages
> > > > should all be written to the same partition. If you are concerned
> with
> > > > global ordering of all messages in a topic then kafka is likely not
> > going
> > > > to be what you want. Ordering guarantees are strictly per partition.
> > > samza
> > > > is built on this principle by having a tasks work from a single
> > > partition.
> > > > If your jobs require global coordination between tasks, again, you
> > might
> > > > reconsider either your architecture or your use of kafka.
> > > >
> > > > Not trying to harsh your mellow here. High scale systems like kafka
> > > require
> > > > you match your architecture to them. To do otherwise produces bad
> > times.
> > > >
> > > > On Thu, Apr 16, 2015 at 1:51 PM, jeremy p <
> > > athomewithagroove...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thank you for the response.  Does this mean the Old-Rules-Job would
> > > need
> > > > to
> > > > > maintain a Last-Processed-Old-Rules offset for each partition?
> > > > >
> > > > > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black  wrote:
> > > > >
> > > > > > Offsets are per partition. The alternative would have poor
> scaling
> > > > > behavior
> > > > > > for both brokers and consumers.
> > > > > >
> > > > > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
> > > > > athomewithagroove...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks to everybody for the responses!
> > > > > > >
> > > > > > > Yi : The queue must be processed in order, which means that I
> > > cannot
> > > > > use
> > > > > > > Ben and Guozhang's approach.
> > > > > > >
> > > > > > > However, it is not necessary that all rules be processed at the
> > > same
> > > > > > offset
> > > > > > > and at the same speed.  This is why I considered a solution
> where
> > > we
> > > > > had
> > > > > > a
> > > > > > > separate job for each rule.  The problem with that solution is
> > that
> > > > we
> > > > > > > could have thousands of these rules, which would mean thousands
> > of
> > > > > jobs.
> > > > > > > These jobs would be really lightweight and would require ve

Re: How to deal with bootstrapping

2015-04-17 Thread Yan Fang
Hi Jeremy,

Benjamin is right, New-Rules-Job will need to know the map of partitions to
offsets. Samza's checkpoint stream has the mapping. The doc is here

 .

However, after my second thought, I do not recommend to use the default
checkpoint stream because 1) it was initially designed for restarting job.
not very friendly for other usage -- a lot of mapping and config stuff
involved. You can check CheckpointTool

to
get a feel how to read that stream 2) This is an important point. your code
will be incompatible with newer Samza -- after SAMZA-465
 , there will be a
coordinator-stream, no checkpoint stream.

Even though you may not use the system's checkpoint stream, you can easily
create and send the latest offset to simple-checkpoint stream. The mapping
problem raised by Benjamin is still solvable. For example, your input
stream Number has 10 partitions, you can write to a
10-partition simple-checkpoint stream. Simple-checkpoint stream's partition
number is always the same as that of Number's. So when you process
partition #1 of Number, you will write to partition #1 of simple-checkpoint
stream. When you bring up the New-Rules-Job, it accepts two streams: Number
and simple-checkpoint stream. The latter has the latest offset of the
Old-Rules-Job. Just need to guarantee the same partition # of Number and
simple-checkpoint stream goes to the same container. By default, it does.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Apr 16, 2015 at 2:16 PM, Benjamin Black  wrote:

> New-Rules-Job will need to know the complete map of partitions to offsets.
>
> On Thu, Apr 16, 2015 at 2:06 PM, jeremy p 
> wrote:
>
> > Ben : I think we are talking about different things here.  I'm not trying
> > to maintain ordering across a topic.  I know that is not what Kafka and
> > Samza are meant for.  What I'm trying to do here is give my
> Old-Rules-Job a
> > way of telling New-Rules-Job, "Once you hit this offset, start applying
> > both old and new rules."  So is that a single absolute offset that I want
> > to pass from Old-Rules-Job to New-Rules-Job?  Or a set of offsets, one
> for
> > each partition.
> >
> > On Thu, Apr 16, 2015 at 4:58 PM, Benjamin Black  wrote:
> >
> > > If you need to maintain ordering of a sequence of messages, those
> > messages
> > > should all be written to the same partition. If you are concerned with
> > > global ordering of all messages in a topic then kafka is likely not
> going
> > > to be what you want. Ordering guarantees are strictly per partition.
> > samza
> > > is built on this principle by having a tasks work from a single
> > partition.
> > > If your jobs require global coordination between tasks, again, you
> might
> > > reconsider either your architecture or your use of kafka.
> > >
> > > Not trying to harsh your mellow here. High scale systems like kafka
> > require
> > > you match your architecture to them. To do otherwise produces bad
> times.
> > >
> > > On Thu, Apr 16, 2015 at 1:51 PM, jeremy p <
> > athomewithagroove...@gmail.com>
> > > wrote:
> > >
> > > > Thank you for the response.  Does this mean the Old-Rules-Job would
> > need
> > > to
> > > > maintain a Last-Processed-Old-Rules offset for each partition?
> > > >
> > > > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black  wrote:
> > > >
> > > > > Offsets are per partition. The alternative would have poor scaling
> > > > behavior
> > > > > for both brokers and consumers.
> > > > >
> > > > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
> > > > athomewithagroove...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks to everybody for the responses!
> > > > > >
> > > > > > Yi : The queue must be processed in order, which means that I
> > cannot
> > > > use
> > > > > > Ben and Guozhang's approach.
> > > > > >
> > > > > > However, it is not necessary that all rules be processed at the
> > same
> > > > > offset
> > > > > > and at the same speed.  This is why I considered a solution where
> > we
> > > > had
> > > > > a
> > > > > > separate job for each rule.  The problem with that solution is
> that
> > > we
> > > > > > could have thousands of these rules, which would mean thousands
> of
> > > > jobs.
> > > > > > These jobs would be really lightweight and would require very few
> > > > system
> > > > > > resources.  However, I don't know if having thousands of jobs
> would
> > > > break
> > > > > > YARN.
> > > > > >
> > > > > > For now, it sounds like Yan's solution would be the best.
> However,
> > I
> > > > > have a
> > > > > > few questions about it.  For now, let's call the original job the
> > > > > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is
> the
> > > > > > solution, as I understand it :
> > > > > >
> > > > > > The Old-Rules-Job exposes the Last-Processed-

Re: How to deal with bootstrapping

2015-04-16 Thread Benjamin Black
New-Rules-Job will need to know the complete map of partitions to offsets.

On Thu, Apr 16, 2015 at 2:06 PM, jeremy p 
wrote:

> Ben : I think we are talking about different things here.  I'm not trying
> to maintain ordering across a topic.  I know that is not what Kafka and
> Samza are meant for.  What I'm trying to do here is give my Old-Rules-Job a
> way of telling New-Rules-Job, "Once you hit this offset, start applying
> both old and new rules."  So is that a single absolute offset that I want
> to pass from Old-Rules-Job to New-Rules-Job?  Or a set of offsets, one for
> each partition.
>
> On Thu, Apr 16, 2015 at 4:58 PM, Benjamin Black  wrote:
>
> > If you need to maintain ordering of a sequence of messages, those
> messages
> > should all be written to the same partition. If you are concerned with
> > global ordering of all messages in a topic then kafka is likely not going
> > to be what you want. Ordering guarantees are strictly per partition.
> samza
> > is built on this principle by having a tasks work from a single
> partition.
> > If your jobs require global coordination between tasks, again, you might
> > reconsider either your architecture or your use of kafka.
> >
> > Not trying to harsh your mellow here. High scale systems like kafka
> require
> > you match your architecture to them. To do otherwise produces bad times.
> >
> > On Thu, Apr 16, 2015 at 1:51 PM, jeremy p <
> athomewithagroove...@gmail.com>
> > wrote:
> >
> > > Thank you for the response.  Does this mean the Old-Rules-Job would
> need
> > to
> > > maintain a Last-Processed-Old-Rules offset for each partition?
> > >
> > > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black  wrote:
> > >
> > > > Offsets are per partition. The alternative would have poor scaling
> > > behavior
> > > > for both brokers and consumers.
> > > >
> > > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
> > > athomewithagroove...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks to everybody for the responses!
> > > > >
> > > > > Yi : The queue must be processed in order, which means that I
> cannot
> > > use
> > > > > Ben and Guozhang's approach.
> > > > >
> > > > > However, it is not necessary that all rules be processed at the
> same
> > > > offset
> > > > > and at the same speed.  This is why I considered a solution where
> we
> > > had
> > > > a
> > > > > separate job for each rule.  The problem with that solution is that
> > we
> > > > > could have thousands of these rules, which would mean thousands of
> > > jobs.
> > > > > These jobs would be really lightweight and would require very few
> > > system
> > > > > resources.  However, I don't know if having thousands of jobs would
> > > break
> > > > > YARN.
> > > > >
> > > > > For now, it sounds like Yan's solution would be the best. However,
> I
> > > > have a
> > > > > few questions about it.  For now, let's call the original job the
> > > > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
> > > > > solution, as I understand it :
> > > > >
> > > > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We
> > > start
> > > > > the All-Rules-Job.  The All-Rules-Job will only apply new rules
> until
> > > it
> > > > > gets to the Last-Processed-Old-Rules offset.  Once the
> All-Rules-Job
> > > gets
> > > > > to the Last-Processed-Old-Rules offset, it sends a kill signal to
> > > > > Old-Rules-Job along a control stream.  Old-Rules-Job terminates
> > itself.
> > > > > Then the All-Rules-Job applies both old and new rules to every
> > message
> > > > that
> > > > > comes in.
> > > > >
> > > > > My questions :
> > > > >
> > > > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset
> > every
> > > > > time it processes a message?  How does the Old-Rules-Job expose the
> > > > > Last-Processed-Rules offset to the All-Rules-Job?  Would the
> > > > > Last-Processed-Rules offset be the absolute offset within a topic,
> > and
> > > > not
> > > > > the offset within a partition?  Is there a way to find out a
> > message's
> > > > > absolute offset within a topic?
> > > > >
> > > > > Thanks again for all the help!
> > > > >
> > > > > --Jeremy
> > > > >
> > > > >
> > > > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan 
> wrote:
> > > > >
> > > > > > Hi, Jeremy,
> > > > > >
> > > > > > I saw the following requirements from your use case:
> > > > > >
> > > > > > 1) New rules need to be dynamically added w/ creating too many
> > Samza
> > > > jobs
> > > > > > (e.g. 1 Samza job per new rule is too much)
> > > > > > 2) Old rules need to continue processing when new rules are added
> > > > > >
> > > > > > I want to ask a few more questions regarding to your
> requirements:
> > > > > >
> > > > > > Q.1) Is it required that for a new rule, the bootstrap processing
> > of
> > > > > > messages from offset 0 to Last-Processed-Old-Rules has to be done
> > > > before
> > > > > > the new rules can be applied to messages from offset
> > > > > > Last-Processed-Old-Rules?
> > > > > > Q.2) Is it r

Re: How to deal with bootstrapping

2015-04-16 Thread jeremy p
Yan : It sounds like the checkpoint stream might help me!  I would like to
learn more about how New-Rules-Job can access the checkpoint stream for
Old-Rules-Job.  Can you please give me an example of how I would do this?
Or could you please point me to some documentation or an article where I
can learn how to do this?

Thank you!

On Thu, Apr 16, 2015 at 5:06 PM, jeremy p 
wrote:

> Ben : I think we are talking about different things here.  I'm not trying
> to maintain ordering across a topic.  I know that is not what Kafka and
> Samza are meant for.  What I'm trying to do here is give my Old-Rules-Job a
> way of telling New-Rules-Job, "Once you hit this offset, start applying
> both old and new rules."  So is that a single absolute offset that I want
> to pass from Old-Rules-Job to New-Rules-Job?  Or a set of offsets, one for
> each partition.
>
> On Thu, Apr 16, 2015 at 4:58 PM, Benjamin Black  wrote:
>
>> If you need to maintain ordering of a sequence of messages, those messages
>> should all be written to the same partition. If you are concerned with
>> global ordering of all messages in a topic then kafka is likely not going
>> to be what you want. Ordering guarantees are strictly per partition. samza
>> is built on this principle by having a tasks work from a single partition.
>> If your jobs require global coordination between tasks, again, you might
>> reconsider either your architecture or your use of kafka.
>>
>> Not trying to harsh your mellow here. High scale systems like kafka
>> require
>> you match your architecture to them. To do otherwise produces bad times.
>>
>> On Thu, Apr 16, 2015 at 1:51 PM, jeremy p > >
>> wrote:
>>
>> > Thank you for the response.  Does this mean the Old-Rules-Job would
>> need to
>> > maintain a Last-Processed-Old-Rules offset for each partition?
>> >
>> > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black  wrote:
>> >
>> > > Offsets are per partition. The alternative would have poor scaling
>> > behavior
>> > > for both brokers and consumers.
>> > >
>> > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
>> > athomewithagroove...@gmail.com>
>> > > wrote:
>> > >
>> > > > Thanks to everybody for the responses!
>> > > >
>> > > > Yi : The queue must be processed in order, which means that I cannot
>> > use
>> > > > Ben and Guozhang's approach.
>> > > >
>> > > > However, it is not necessary that all rules be processed at the same
>> > > offset
>> > > > and at the same speed.  This is why I considered a solution where we
>> > had
>> > > a
>> > > > separate job for each rule.  The problem with that solution is that
>> we
>> > > > could have thousands of these rules, which would mean thousands of
>> > jobs.
>> > > > These jobs would be really lightweight and would require very few
>> > system
>> > > > resources.  However, I don't know if having thousands of jobs would
>> > break
>> > > > YARN.
>> > > >
>> > > > For now, it sounds like Yan's solution would be the best. However, I
>> > > have a
>> > > > few questions about it.  For now, let's call the original job the
>> > > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
>> > > > solution, as I understand it :
>> > > >
>> > > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We
>> > start
>> > > > the All-Rules-Job.  The All-Rules-Job will only apply new rules
>> until
>> > it
>> > > > gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job
>> > gets
>> > > > to the Last-Processed-Old-Rules offset, it sends a kill signal to
>> > > > Old-Rules-Job along a control stream.  Old-Rules-Job terminates
>> itself.
>> > > > Then the All-Rules-Job applies both old and new rules to every
>> message
>> > > that
>> > > > comes in.
>> > > >
>> > > > My questions :
>> > > >
>> > > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset
>> every
>> > > > time it processes a message?  How does the Old-Rules-Job expose the
>> > > > Last-Processed-Rules offset to the All-Rules-Job?  Would the
>> > > > Last-Processed-Rules offset be the absolute offset within a topic,
>> and
>> > > not
>> > > > the offset within a partition?  Is there a way to find out a
>> message's
>> > > > absolute offset within a topic?
>> > > >
>> > > > Thanks again for all the help!
>> > > >
>> > > > --Jeremy
>> > > >
>> > > >
>> > > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan 
>> wrote:
>> > > >
>> > > > > Hi, Jeremy,
>> > > > >
>> > > > > I saw the following requirements from your use case:
>> > > > >
>> > > > > 1) New rules need to be dynamically added w/ creating too many
>> Samza
>> > > jobs
>> > > > > (e.g. 1 Samza job per new rule is too much)
>> > > > > 2) Old rules need to continue processing when new rules are added
>> > > > >
>> > > > > I want to ask a few more questions regarding to your requirements:
>> > > > >
>> > > > > Q.1) Is it required that for a new rule, the bootstrap processing
>> of
>> > > > > messages from offset 0 to Last-Processed-Old-Rules has to be done
>> > > before
>> > > > > the new rules

Re: How to deal with bootstrapping

2015-04-16 Thread jeremy p
Ben : I think we are talking about different things here.  I'm not trying
to maintain ordering across a topic.  I know that is not what Kafka and
Samza are meant for.  What I'm trying to do here is give my Old-Rules-Job a
way of telling New-Rules-Job, "Once you hit this offset, start applying
both old and new rules."  So is that a single absolute offset that I want
to pass from Old-Rules-Job to New-Rules-Job?  Or a set of offsets, one for
each partition.

On Thu, Apr 16, 2015 at 4:58 PM, Benjamin Black  wrote:

> If you need to maintain ordering of a sequence of messages, those messages
> should all be written to the same partition. If you are concerned with
> global ordering of all messages in a topic then kafka is likely not going
> to be what you want. Ordering guarantees are strictly per partition. samza
> is built on this principle by having a tasks work from a single partition.
> If your jobs require global coordination between tasks, again, you might
> reconsider either your architecture or your use of kafka.
>
> Not trying to harsh your mellow here. High scale systems like kafka require
> you match your architecture to them. To do otherwise produces bad times.
>
> On Thu, Apr 16, 2015 at 1:51 PM, jeremy p 
> wrote:
>
> > Thank you for the response.  Does this mean the Old-Rules-Job would need
> to
> > maintain a Last-Processed-Old-Rules offset for each partition?
> >
> > On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black  wrote:
> >
> > > Offsets are per partition. The alternative would have poor scaling
> > behavior
> > > for both brokers and consumers.
> > >
> > > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
> > athomewithagroove...@gmail.com>
> > > wrote:
> > >
> > > > Thanks to everybody for the responses!
> > > >
> > > > Yi : The queue must be processed in order, which means that I cannot
> > use
> > > > Ben and Guozhang's approach.
> > > >
> > > > However, it is not necessary that all rules be processed at the same
> > > offset
> > > > and at the same speed.  This is why I considered a solution where we
> > had
> > > a
> > > > separate job for each rule.  The problem with that solution is that
> we
> > > > could have thousands of these rules, which would mean thousands of
> > jobs.
> > > > These jobs would be really lightweight and would require very few
> > system
> > > > resources.  However, I don't know if having thousands of jobs would
> > break
> > > > YARN.
> > > >
> > > > For now, it sounds like Yan's solution would be the best. However, I
> > > have a
> > > > few questions about it.  For now, let's call the original job the
> > > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
> > > > solution, as I understand it :
> > > >
> > > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We
> > start
> > > > the All-Rules-Job.  The All-Rules-Job will only apply new rules until
> > it
> > > > gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job
> > gets
> > > > to the Last-Processed-Old-Rules offset, it sends a kill signal to
> > > > Old-Rules-Job along a control stream.  Old-Rules-Job terminates
> itself.
> > > > Then the All-Rules-Job applies both old and new rules to every
> message
> > > that
> > > > comes in.
> > > >
> > > > My questions :
> > > >
> > > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset
> every
> > > > time it processes a message?  How does the Old-Rules-Job expose the
> > > > Last-Processed-Rules offset to the All-Rules-Job?  Would the
> > > > Last-Processed-Rules offset be the absolute offset within a topic,
> and
> > > not
> > > > the offset within a partition?  Is there a way to find out a
> message's
> > > > absolute offset within a topic?
> > > >
> > > > Thanks again for all the help!
> > > >
> > > > --Jeremy
> > > >
> > > >
> > > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan  wrote:
> > > >
> > > > > Hi, Jeremy,
> > > > >
> > > > > I saw the following requirements from your use case:
> > > > >
> > > > > 1) New rules need to be dynamically added w/ creating too many
> Samza
> > > jobs
> > > > > (e.g. 1 Samza job per new rule is too much)
> > > > > 2) Old rules need to continue processing when new rules are added
> > > > >
> > > > > I want to ask a few more questions regarding to your requirements:
> > > > >
> > > > > Q.1) Is it required that for a new rule, the bootstrap processing
> of
> > > > > messages from offset 0 to Last-Processed-Old-Rules has to be done
> > > before
> > > > > the new rules can be applied to messages from offset
> > > > > Last-Processed-Old-Rules?
> > > > > Q.2) Is it required that after bootstrap, all rules are processing
> > the
> > > > > message at the same offset w/ the same speed?
> > > > >
> > > > > If the answers to both questions (i.e. Q.1 and Q.2) above are yes,
> we
> > > > will
> > > > > have to slow down or stop the jobs for the old rules s.t. the jobs
> > > > running
> > > > > both new and old rules can catch up, as Yan pointed out. If answers
> > to
> > > > both
> > > >

Re: How to deal with bootstrapping

2015-04-16 Thread Benjamin Black
If you need to maintain ordering of a sequence of messages, those messages
should all be written to the same partition. If you are concerned with
global ordering of all messages in a topic then kafka is likely not going
to be what you want. Ordering guarantees are strictly per partition. samza
is built on this principle by having a tasks work from a single partition.
If your jobs require global coordination between tasks, again, you might
reconsider either your architecture or your use of kafka.

Not trying to harsh your mellow here. High scale systems like kafka require
you match your architecture to them. To do otherwise produces bad times.

On Thu, Apr 16, 2015 at 1:51 PM, jeremy p 
wrote:

> Thank you for the response.  Does this mean the Old-Rules-Job would need to
> maintain a Last-Processed-Old-Rules offset for each partition?
>
> On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black  wrote:
>
> > Offsets are per partition. The alternative would have poor scaling
> behavior
> > for both brokers and consumers.
> >
> > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
> athomewithagroove...@gmail.com>
> > wrote:
> >
> > > Thanks to everybody for the responses!
> > >
> > > Yi : The queue must be processed in order, which means that I cannot
> use
> > > Ben and Guozhang's approach.
> > >
> > > However, it is not necessary that all rules be processed at the same
> > offset
> > > and at the same speed.  This is why I considered a solution where we
> had
> > a
> > > separate job for each rule.  The problem with that solution is that we
> > > could have thousands of these rules, which would mean thousands of
> jobs.
> > > These jobs would be really lightweight and would require very few
> system
> > > resources.  However, I don't know if having thousands of jobs would
> break
> > > YARN.
> > >
> > > For now, it sounds like Yan's solution would be the best. However, I
> > have a
> > > few questions about it.  For now, let's call the original job the
> > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
> > > solution, as I understand it :
> > >
> > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We
> start
> > > the All-Rules-Job.  The All-Rules-Job will only apply new rules until
> it
> > > gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job
> gets
> > > to the Last-Processed-Old-Rules offset, it sends a kill signal to
> > > Old-Rules-Job along a control stream.  Old-Rules-Job terminates itself.
> > > Then the All-Rules-Job applies both old and new rules to every message
> > that
> > > comes in.
> > >
> > > My questions :
> > >
> > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every
> > > time it processes a message?  How does the Old-Rules-Job expose the
> > > Last-Processed-Rules offset to the All-Rules-Job?  Would the
> > > Last-Processed-Rules offset be the absolute offset within a topic, and
> > not
> > > the offset within a partition?  Is there a way to find out a message's
> > > absolute offset within a topic?
> > >
> > > Thanks again for all the help!
> > >
> > > --Jeremy
> > >
> > >
> > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan  wrote:
> > >
> > > > Hi, Jeremy,
> > > >
> > > > I saw the following requirements from your use case:
> > > >
> > > > 1) New rules need to be dynamically added w/ creating too many Samza
> > jobs
> > > > (e.g. 1 Samza job per new rule is too much)
> > > > 2) Old rules need to continue processing when new rules are added
> > > >
> > > > I want to ask a few more questions regarding to your requirements:
> > > >
> > > > Q.1) Is it required that for a new rule, the bootstrap processing of
> > > > messages from offset 0 to Last-Processed-Old-Rules has to be done
> > before
> > > > the new rules can be applied to messages from offset
> > > > Last-Processed-Old-Rules?
> > > > Q.2) Is it required that after bootstrap, all rules are processing
> the
> > > > message at the same offset w/ the same speed?
> > > >
> > > > If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we
> > > will
> > > > have to slow down or stop the jobs for the old rules s.t. the jobs
> > > running
> > > > both new and old rules can catch up, as Yan pointed out. If answers
> to
> > > both
> > > > questions above are no (which I doubt since you need to build-up
> > certain
> > > > "history" for the new rule before you can apply it to later
> messages),
> > > you
> > > > can take Ben/Guozhang's approach w/o coordination between the two
> jobs.
> > > >
> > > > Now the interesting case is that your answer to Q.1 is yes, and to
> Q.2
> > is
> > > > no, which essentially post a request that your job will need to keep
> > > > multiple independent consumer offsets per rule and let them move w/
> > their
> > > > own speed. Or, at least one bootstrap consumer, and one normal
> > processing
> > > > consumer on the same system stream partition within a single job. I
> > don't
> > > > think that Samza support this now. And the only work around is

Re: How to deal with bootstrapping

2015-04-16 Thread Yan Fang
Hi Jeremy,

Samza already has a checkpoint stream, which records the latest-processed
offset. The new-job can reuse old-job's checkpoint stream.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Apr 16, 2015 at 1:51 PM, jeremy p 
wrote:

> Thank you for the response.  Does this mean the Old-Rules-Job would need to
> maintain a Last-Processed-Old-Rules offset for each partition?
>
> On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black  wrote:
>
> > Offsets are per partition. The alternative would have poor scaling
> behavior
> > for both brokers and consumers.
> >
> > On Thu, Apr 16, 2015 at 1:01 PM, jeremy p <
> athomewithagroove...@gmail.com>
> > wrote:
> >
> > > Thanks to everybody for the responses!
> > >
> > > Yi : The queue must be processed in order, which means that I cannot
> use
> > > Ben and Guozhang's approach.
> > >
> > > However, it is not necessary that all rules be processed at the same
> > offset
> > > and at the same speed.  This is why I considered a solution where we
> had
> > a
> > > separate job for each rule.  The problem with that solution is that we
> > > could have thousands of these rules, which would mean thousands of
> jobs.
> > > These jobs would be really lightweight and would require very few
> system
> > > resources.  However, I don't know if having thousands of jobs would
> break
> > > YARN.
> > >
> > > For now, it sounds like Yan's solution would be the best. However, I
> > have a
> > > few questions about it.  For now, let's call the original job the
> > > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
> > > solution, as I understand it :
> > >
> > > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We
> start
> > > the All-Rules-Job.  The All-Rules-Job will only apply new rules until
> it
> > > gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job
> gets
> > > to the Last-Processed-Old-Rules offset, it sends a kill signal to
> > > Old-Rules-Job along a control stream.  Old-Rules-Job terminates itself.
> > > Then the All-Rules-Job applies both old and new rules to every message
> > that
> > > comes in.
> > >
> > > My questions :
> > >
> > > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every
> > > time it processes a message?  How does the Old-Rules-Job expose the
> > > Last-Processed-Rules offset to the All-Rules-Job?  Would the
> > > Last-Processed-Rules offset be the absolute offset within a topic, and
> > not
> > > the offset within a partition?  Is there a way to find out a message's
> > > absolute offset within a topic?
> > >
> > > Thanks again for all the help!
> > >
> > > --Jeremy
> > >
> > >
> > > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan  wrote:
> > >
> > > > Hi, Jeremy,
> > > >
> > > > I saw the following requirements from your use case:
> > > >
> > > > 1) New rules need to be dynamically added w/ creating too many Samza
> > jobs
> > > > (e.g. 1 Samza job per new rule is too much)
> > > > 2) Old rules need to continue processing when new rules are added
> > > >
> > > > I want to ask a few more questions regarding to your requirements:
> > > >
> > > > Q.1) Is it required that for a new rule, the bootstrap processing of
> > > > messages from offset 0 to Last-Processed-Old-Rules has to be done
> > before
> > > > the new rules can be applied to messages from offset
> > > > Last-Processed-Old-Rules?
> > > > Q.2) Is it required that after bootstrap, all rules are processing
> the
> > > > message at the same offset w/ the same speed?
> > > >
> > > > If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we
> > > will
> > > > have to slow down or stop the jobs for the old rules s.t. the jobs
> > > running
> > > > both new and old rules can catch up, as Yan pointed out. If answers
> to
> > > both
> > > > questions above are no (which I doubt since you need to build-up
> > certain
> > > > "history" for the new rule before you can apply it to later
> messages),
> > > you
> > > > can take Ben/Guozhang's approach w/o coordination between the two
> jobs.
> > > >
> > > > Now the interesting case is that your answer to Q.1 is yes, and to
> Q.2
> > is
> > > > no, which essentially post a request that your job will need to keep
> > > > multiple independent consumer offsets per rule and let them move w/
> > their
> > > > own speed. Or, at least one bootstrap consumer, and one normal
> > processing
> > > > consumer on the same system stream partition within a single job. I
> > don't
> > > > think that Samza support this now. And the only work around is Yan's
> > > > solution which requires coordination between two jobs.
> > > >
> > > > -Yi
> > > >
> > > > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang 
> > wrote:
> > > >
> > > > > you are able to call coordinator.shutdown to shut the job down
> after
> > it
> > > > > reaches the offset.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Fang, Yan
> > > > > yanfang...@gmail.com
> > > > >
> > > > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang  >
> > > > wrote:
> > 

Re: How to deal with bootstrapping

2015-04-16 Thread jeremy p
Thank you for the response.  Does this mean the Old-Rules-Job would need to
maintain a Last-Processed-Old-Rules offset for each partition?

On Thu, Apr 16, 2015 at 4:47 PM, Benjamin Black  wrote:

> Offsets are per partition. The alternative would have poor scaling behavior
> for both brokers and consumers.
>
> On Thu, Apr 16, 2015 at 1:01 PM, jeremy p 
> wrote:
>
> > Thanks to everybody for the responses!
> >
> > Yi : The queue must be processed in order, which means that I cannot use
> > Ben and Guozhang's approach.
> >
> > However, it is not necessary that all rules be processed at the same
> offset
> > and at the same speed.  This is why I considered a solution where we had
> a
> > separate job for each rule.  The problem with that solution is that we
> > could have thousands of these rules, which would mean thousands of jobs.
> > These jobs would be really lightweight and would require very few system
> > resources.  However, I don't know if having thousands of jobs would break
> > YARN.
> >
> > For now, it sounds like Yan's solution would be the best. However, I
> have a
> > few questions about it.  For now, let's call the original job the
> > Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
> > solution, as I understand it :
> >
> > The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We start
> > the All-Rules-Job.  The All-Rules-Job will only apply new rules until it
> > gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job gets
> > to the Last-Processed-Old-Rules offset, it sends a kill signal to
> > Old-Rules-Job along a control stream.  Old-Rules-Job terminates itself.
> > Then the All-Rules-Job applies both old and new rules to every message
> that
> > comes in.
> >
> > My questions :
> >
> > Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every
> > time it processes a message?  How does the Old-Rules-Job expose the
> > Last-Processed-Rules offset to the All-Rules-Job?  Would the
> > Last-Processed-Rules offset be the absolute offset within a topic, and
> not
> > the offset within a partition?  Is there a way to find out a message's
> > absolute offset within a topic?
> >
> > Thanks again for all the help!
> >
> > --Jeremy
> >
> >
> > On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan  wrote:
> >
> > > Hi, Jeremy,
> > >
> > > I saw the following requirements from your use case:
> > >
> > > 1) New rules need to be dynamically added w/ creating too many Samza
> jobs
> > > (e.g. 1 Samza job per new rule is too much)
> > > 2) Old rules need to continue processing when new rules are added
> > >
> > > I want to ask a few more questions regarding to your requirements:
> > >
> > > Q.1) Is it required that for a new rule, the bootstrap processing of
> > > messages from offset 0 to Last-Processed-Old-Rules has to be done
> before
> > > the new rules can be applied to messages from offset
> > > Last-Processed-Old-Rules?
> > > Q.2) Is it required that after bootstrap, all rules are processing the
> > > message at the same offset w/ the same speed?
> > >
> > > If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we
> > will
> > > have to slow down or stop the jobs for the old rules s.t. the jobs
> > running
> > > both new and old rules can catch up, as Yan pointed out. If answers to
> > both
> > > questions above are no (which I doubt since you need to build-up
> certain
> > > "history" for the new rule before you can apply it to later messages),
> > you
> > > can take Ben/Guozhang's approach w/o coordination between the two jobs.
> > >
> > > Now the interesting case is that your answer to Q.1 is yes, and to Q.2
> is
> > > no, which essentially post a request that your job will need to keep
> > > multiple independent consumer offsets per rule and let them move w/
> their
> > > own speed. Or, at least one bootstrap consumer, and one normal
> processing
> > > consumer on the same system stream partition within a single job. I
> don't
> > > think that Samza support this now. And the only work around is Yan's
> > > solution which requires coordination between two jobs.
> > >
> > > -Yi
> > >
> > > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang 
> wrote:
> > >
> > > > you are able to call coordinator.shutdown to shut the job down after
> it
> > > > reaches the offset.
> > > >
> > > > Thanks,
> > > >
> > > > Fang, Yan
> > > > yanfang...@gmail.com
> > > >
> > > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > I feel Ben's solution a bit simpler that you just need to restart
> > your
> > > > > current job with both rules on the check pointed offset, and start
> a
> > > new
> > > > > job from offset 0 with only the new rule and it will stop at the
> > > checkout
> > > > > pointed offset. But of course it requires the second job to be able
> > to
> > > > > shutdown itself upon some specific offset which I am not sure if it
> > is
> > > > > already supported.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Apr 15, 201

Re: How to deal with bootstrapping

2015-04-16 Thread Benjamin Black
Offsets are per partition. The alternative would have poor scaling behavior
for both brokers and consumers.

On Thu, Apr 16, 2015 at 1:01 PM, jeremy p 
wrote:

> Thanks to everybody for the responses!
>
> Yi : The queue must be processed in order, which means that I cannot use
> Ben and Guozhang's approach.
>
> However, it is not necessary that all rules be processed at the same offset
> and at the same speed.  This is why I considered a solution where we had a
> separate job for each rule.  The problem with that solution is that we
> could have thousands of these rules, which would mean thousands of jobs.
> These jobs would be really lightweight and would require very few system
> resources.  However, I don't know if having thousands of jobs would break
> YARN.
>
> For now, it sounds like Yan's solution would be the best. However, I have a
> few questions about it.  For now, let's call the original job the
> Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
> solution, as I understand it :
>
> The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We start
> the All-Rules-Job.  The All-Rules-Job will only apply new rules until it
> gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job gets
> to the Last-Processed-Old-Rules offset, it sends a kill signal to
> Old-Rules-Job along a control stream.  Old-Rules-Job terminates itself.
> Then the All-Rules-Job applies both old and new rules to every message that
> comes in.
>
> My questions :
>
> Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every
> time it processes a message?  How does the Old-Rules-Job expose the
> Last-Processed-Rules offset to the All-Rules-Job?  Would the
> Last-Processed-Rules offset be the absolute offset within a topic, and not
> the offset within a partition?  Is there a way to find out a message's
> absolute offset within a topic?
>
> Thanks again for all the help!
>
> --Jeremy
>
>
> On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan  wrote:
>
> > Hi, Jeremy,
> >
> > I saw the following requirements from your use case:
> >
> > 1) New rules need to be dynamically added w/ creating too many Samza jobs
> > (e.g. 1 Samza job per new rule is too much)
> > 2) Old rules need to continue processing when new rules are added
> >
> > I want to ask a few more questions regarding to your requirements:
> >
> > Q.1) Is it required that for a new rule, the bootstrap processing of
> > messages from offset 0 to Last-Processed-Old-Rules has to be done before
> > the new rules can be applied to messages from offset
> > Last-Processed-Old-Rules?
> > Q.2) Is it required that after bootstrap, all rules are processing the
> > message at the same offset w/ the same speed?
> >
> > If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we
> will
> > have to slow down or stop the jobs for the old rules s.t. the jobs
> running
> > both new and old rules can catch up, as Yan pointed out. If answers to
> both
> > questions above are no (which I doubt since you need to build-up certain
> > "history" for the new rule before you can apply it to later messages),
> you
> > can take Ben/Guozhang's approach w/o coordination between the two jobs.
> >
> > Now the interesting case is that your answer to Q.1 is yes, and to Q.2 is
> > no, which essentially post a request that your job will need to keep
> > multiple independent consumer offsets per rule and let them move w/ their
> > own speed. Or, at least one bootstrap consumer, and one normal processing
> > consumer on the same system stream partition within a single job. I don't
> > think that Samza support this now. And the only work around is Yan's
> > solution which requires coordination between two jobs.
> >
> > -Yi
> >
> > On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang  wrote:
> >
> > > you are able to call coordinator.shutdown to shut the job down after it
> > > reaches the offset.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang 
> > wrote:
> > >
> > > > I feel Ben's solution a bit simpler that you just need to restart
> your
> > > > current job with both rules on the check pointed offset, and start a
> > new
> > > > job from offset 0 with only the new rule and it will stop at the
> > checkout
> > > > pointed offset. But of course it requires the second job to be able
> to
> > > > shutdown itself upon some specific offset which I am not sure if it
> is
> > > > already supported.
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang 
> > wrote:
> > > >
> > > > > Hi Jeremy,
> > > > >
> > > > > In order to reach this goal, we have to assume that the job with
> new
> > > > rules
> > > > > can always catch up with the one with old rules. Otherwise, I think
> > we
> > > do
> > > > > not have the choice but running a lot of jobs simultaneously.
> > > > >
> > > > > Under our assumption, we have job1 with old rules running, and now
> > add
> > > > job2
> > > > > 

Re: How to deal with bootstrapping

2015-04-16 Thread jeremy p
Thanks to everybody for the responses!

Yi : The queue must be processed in order, which means that I cannot use
Ben and Guozhang's approach.

However, it is not necessary that all rules be processed at the same offset
and at the same speed.  This is why I considered a solution where we had a
separate job for each rule.  The problem with that solution is that we
could have thousands of these rules, which would mean thousands of jobs.
These jobs would be really lightweight and would require very few system
resources.  However, I don't know if having thousands of jobs would break
YARN.

For now, it sounds like Yan's solution would be the best. However, I have a
few questions about it.  For now, let's call the original job the
Old-Rules-Job, and the boostrap job the All-Rules-Job. This is the
solution, as I understand it :

The Old-Rules-Job exposes the Last-Processed-Old-Rules offset.  We start
the All-Rules-Job.  The All-Rules-Job will only apply new rules until it
gets to the Last-Processed-Old-Rules offset.  Once the All-Rules-Job gets
to the Last-Processed-Old-Rules offset, it sends a kill signal to
Old-Rules-Job along a control stream.  Old-Rules-Job terminates itself.
Then the All-Rules-Job applies both old and new rules to every message that
comes in.

My questions :

Does the Old-Rules-Job update the Last-Processed-Old-Rules offset every
time it processes a message?  How does the Old-Rules-Job expose the
Last-Processed-Rules offset to the All-Rules-Job?  Would the
Last-Processed-Rules offset be the absolute offset within a topic, and not
the offset within a partition?  Is there a way to find out a message's
absolute offset within a topic?

Thanks again for all the help!

--Jeremy


On Thu, Apr 16, 2015 at 3:06 PM, Yi Pan  wrote:

> Hi, Jeremy,
>
> I saw the following requirements from your use case:
>
> 1) New rules need to be dynamically added w/ creating too many Samza jobs
> (e.g. 1 Samza job per new rule is too much)
> 2) Old rules need to continue processing when new rules are added
>
> I want to ask a few more questions regarding to your requirements:
>
> Q.1) Is it required that for a new rule, the bootstrap processing of
> messages from offset 0 to Last-Processed-Old-Rules has to be done before
> the new rules can be applied to messages from offset
> Last-Processed-Old-Rules?
> Q.2) Is it required that after bootstrap, all rules are processing the
> message at the same offset w/ the same speed?
>
> If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we will
> have to slow down or stop the jobs for the old rules s.t. the jobs running
> both new and old rules can catch up, as Yan pointed out. If answers to both
> questions above are no (which I doubt since you need to build-up certain
> "history" for the new rule before you can apply it to later messages), you
> can take Ben/Guozhang's approach w/o coordination between the two jobs.
>
> Now the interesting case is that your answer to Q.1 is yes, and to Q.2 is
> no, which essentially post a request that your job will need to keep
> multiple independent consumer offsets per rule and let them move w/ their
> own speed. Or, at least one bootstrap consumer, and one normal processing
> consumer on the same system stream partition within a single job. I don't
> think that Samza support this now. And the only work around is Yan's
> solution which requires coordination between two jobs.
>
> -Yi
>
> On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang  wrote:
>
> > you are able to call coordinator.shutdown to shut the job down after it
> > reaches the offset.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang 
> wrote:
> >
> > > I feel Ben's solution a bit simpler that you just need to restart your
> > > current job with both rules on the check pointed offset, and start a
> new
> > > job from offset 0 with only the new rule and it will stop at the
> checkout
> > > pointed offset. But of course it requires the second job to be able to
> > > shutdown itself upon some specific offset which I am not sure if it is
> > > already supported.
> > >
> > > Guozhang
> > >
> > > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang 
> wrote:
> > >
> > > > Hi Jeremy,
> > > >
> > > > In order to reach this goal, we have to assume that the job with new
> > > rules
> > > > can always catch up with the one with old rules. Otherwise, I think
> we
> > do
> > > > not have the choice but running a lot of jobs simultaneously.
> > > >
> > > > Under our assumption, we have job1 with old rules running, and now
> add
> > > job2
> > > > which integrates old rules and new rules to run. Job2 frequently
> > > > checks the Last-Processed-Old-Rules
> > > > offset from job1 (because job1 is running too), and it only applies
> new
> > > > rule to the data until catch up with the Last-Processed-Old-Rules
> > offset.
> > > > Then it sends signal to the job1 and shutdown job1, and applies all
> > rules
> > > > to the stream.
> > > >
> >

Re: How to deal with bootstrapping

2015-04-16 Thread Yi Pan
Hi, Jeremy,

I saw the following requirements from your use case:

1) New rules need to be dynamically added w/ creating too many Samza jobs
(e.g. 1 Samza job per new rule is too much)
2) Old rules need to continue processing when new rules are added

I want to ask a few more questions regarding to your requirements:

Q.1) Is it required that for a new rule, the bootstrap processing of
messages from offset 0 to Last-Processed-Old-Rules has to be done before
the new rules can be applied to messages from offset
Last-Processed-Old-Rules?
Q.2) Is it required that after bootstrap, all rules are processing the
message at the same offset w/ the same speed?

If the answers to both questions (i.e. Q.1 and Q.2) above are yes, we will
have to slow down or stop the jobs for the old rules s.t. the jobs running
both new and old rules can catch up, as Yan pointed out. If answers to both
questions above are no (which I doubt since you need to build-up certain
"history" for the new rule before you can apply it to later messages), you
can take Ben/Guozhang's approach w/o coordination between the two jobs.

Now the interesting case is that your answer to Q.1 is yes, and to Q.2 is
no, which essentially post a request that your job will need to keep
multiple independent consumer offsets per rule and let them move w/ their
own speed. Or, at least one bootstrap consumer, and one normal processing
consumer on the same system stream partition within a single job. I don't
think that Samza support this now. And the only work around is Yan's
solution which requires coordination between two jobs.

-Yi

On Thu, Apr 16, 2015 at 11:21 AM, Yan Fang  wrote:

> you are able to call coordinator.shutdown to shut the job down after it
> reaches the offset.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang  wrote:
>
> > I feel Ben's solution a bit simpler that you just need to restart your
> > current job with both rules on the check pointed offset, and start a new
> > job from offset 0 with only the new rule and it will stop at the checkout
> > pointed offset. But of course it requires the second job to be able to
> > shutdown itself upon some specific offset which I am not sure if it is
> > already supported.
> >
> > Guozhang
> >
> > On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang  wrote:
> >
> > > Hi Jeremy,
> > >
> > > In order to reach this goal, we have to assume that the job with new
> > rules
> > > can always catch up with the one with old rules. Otherwise, I think we
> do
> > > not have the choice but running a lot of jobs simultaneously.
> > >
> > > Under our assumption, we have job1 with old rules running, and now add
> > job2
> > > which integrates old rules and new rules to run. Job2 frequently
> > > checks the Last-Processed-Old-Rules
> > > offset from job1 (because job1 is running too), and it only applies new
> > > rule to the data until catch up with the Last-Processed-Old-Rules
> offset.
> > > Then it sends signal to the job1 and shutdown job1, and applies all
> rules
> > > to the stream.
> > >
> > > In terms of how to shutdown the job1, here is one solution
> > > <
> > >
> >
> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E
> > > >
> > > provided by Chris - e.g. you can have a control stream to get job1
> > > shutdown. Samza will provide this kind of stream after SAMZA-348
> > > , which is under
> active
> > > development.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <
> > athomewithagroove...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hello Yan,
> > > >
> > > > Thank you for the suggestion!  I think your solution would work,
> > > however, I
> > > > am afraid it would create a performance problem for our users.
> > > >
> > > > Let's say we kill the Classifier task, and create a new Classifier
> task
> > > > with both the existing rules and new rules. We get the offset of the
> > > > latest-processed message for the old rules.  Let's call this offset
> > > > Last-Processed-Old-Rules.  We ignore messages
> > > > before Last-Processed-Old-Rules for the old rules.  We configure the
> > new
> > > > Classifier task to be a bootstrap task.
> > > >
> > > > Let's say we have users who are watching the output topics, and they
> > are
> > > > expecting near-realtime updates.  They won't see any updates for the
> > old
> > > > rules until our task has passed the Last-Processed-Old-Rules offset.
> > If
> > > we
> > > > have a lot of messages in that topic, that could take a long time.
> > This
> > > is
> > > > why I was hoping there would be a way to bootstrap the new rules
> while
> > > > we're still processing the old rules.  Do you think there is a way to
> > do
> > > > that?
> > > >
> > > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang 
> > wrote:
> > > >
> > > > > Hi Jeremy,
> > > > >
> > > > > If my unde

Re: How to deal with bootstrapping

2015-04-16 Thread Yan Fang
you are able to call coordinator.shutdown to shut the job down after it
reaches the offset.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Thu, Apr 16, 2015 at 8:59 AM, Guozhang Wang  wrote:

> I feel Ben's solution a bit simpler that you just need to restart your
> current job with both rules on the check pointed offset, and start a new
> job from offset 0 with only the new rule and it will stop at the checkout
> pointed offset. But of course it requires the second job to be able to
> shutdown itself upon some specific offset which I am not sure if it is
> already supported.
>
> Guozhang
>
> On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang  wrote:
>
> > Hi Jeremy,
> >
> > In order to reach this goal, we have to assume that the job with new
> rules
> > can always catch up with the one with old rules. Otherwise, I think we do
> > not have the choice but running a lot of jobs simultaneously.
> >
> > Under our assumption, we have job1 with old rules running, and now add
> job2
> > which integrates old rules and new rules to run. Job2 frequently
> > checks the Last-Processed-Old-Rules
> > offset from job1 (because job1 is running too), and it only applies new
> > rule to the data until catch up with the Last-Processed-Old-Rules offset.
> > Then it sends signal to the job1 and shutdown job1, and applies all rules
> > to the stream.
> >
> > In terms of how to shutdown the job1, here is one solution
> > <
> >
> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E
> > >
> > provided by Chris - e.g. you can have a control stream to get job1
> > shutdown. Samza will provide this kind of stream after SAMZA-348
> > , which is under active
> > development.
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Apr 15, 2015 at 12:17 PM, jeremy p <
> athomewithagroove...@gmail.com
> > >
> > wrote:
> >
> > > Hello Yan,
> > >
> > > Thank you for the suggestion!  I think your solution would work,
> > however, I
> > > am afraid it would create a performance problem for our users.
> > >
> > > Let's say we kill the Classifier task, and create a new Classifier task
> > > with both the existing rules and new rules. We get the offset of the
> > > latest-processed message for the old rules.  Let's call this offset
> > > Last-Processed-Old-Rules.  We ignore messages
> > > before Last-Processed-Old-Rules for the old rules.  We configure the
> new
> > > Classifier task to be a bootstrap task.
> > >
> > > Let's say we have users who are watching the output topics, and they
> are
> > > expecting near-realtime updates.  They won't see any updates for the
> old
> > > rules until our task has passed the Last-Processed-Old-Rules offset.
> If
> > we
> > > have a lot of messages in that topic, that could take a long time.
> This
> > is
> > > why I was hoping there would be a way to bootstrap the new rules while
> > > we're still processing the old rules.  Do you think there is a way to
> do
> > > that?
> > >
> > > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang 
> wrote:
> > >
> > > > Hi Jeremy,
> > > >
> > > > If my understanding is correct, whenever you add a new rule, you want
> > to
> > > > apply this rule to the historical data. Right?
> > > >
> > > > If you do not care about duplication, you can create a new task that
> > > > contains existing rules and new rules. Configure bootstrap. This will
> > > apply
> > > > all the rules from the beginning of the input stream. The shortcoming
> > is
> > > > you will get duplicated results for old rules.
> > > >
> > > > If you can not tolerate the shortcoming, 1) get the offset of the
> > > > latest-processed message of old rules. 2) In your new task, ignore
> > > messages
> > > > before that offset for the old rules. 3) bootstrap.
> > > >
> > > > Hope this helps. Maybe your use case is more complicated?
> > > >
> > > > Thanks,
> > > >
> > > > Fang, Yan
> > > > yanfang...@gmail.com
> > > >
> > > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> > > athomewithagroove...@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > So, I'm wanting to use Samza for a project I'm working on, but I
> keep
> > > > > running into a problem with bootstrapping.
> > > > >
> > > > > Let's say there's a Kafka topic called Numbers that I want to
> consume
> > > > with
> > > > > Samza.  Let's say each message has a single integer in it, and I
> want
> > > to
> > > > > classify it as even or odd.  So I have two topics that I'm using
> for
> > > > > output, one called Even and one called Odd.  I write a simple
> stream
> > > task
> > > > > called Classifier that consumes the Numbers topic, examines each
> > > incoming
> > > > > integer and writes it back out to Even or Odd.
> > > > >
> > > > > Now, let's say I want to be able to add classifications
> dynamically,
> > > > like :
> > > > > "divisible by three", "divisible by four", or "numbers that appear
> in
> > > my
> > > > > date of birth".  And let's say I hav

Re: How to deal with bootstrapping

2015-04-16 Thread Guozhang Wang
I feel Ben's solution a bit simpler that you just need to restart your
current job with both rules on the check pointed offset, and start a new
job from offset 0 with only the new rule and it will stop at the checkout
pointed offset. But of course it requires the second job to be able to
shutdown itself upon some specific offset which I am not sure if it is
already supported.

Guozhang

On Wed, Apr 15, 2015 at 5:43 PM, Yan Fang  wrote:

> Hi Jeremy,
>
> In order to reach this goal, we have to assume that the job with new rules
> can always catch up with the one with old rules. Otherwise, I think we do
> not have the choice but running a lot of jobs simultaneously.
>
> Under our assumption, we have job1 with old rules running, and now add job2
> which integrates old rules and new rules to run. Job2 frequently
> checks the Last-Processed-Old-Rules
> offset from job1 (because job1 is running too), and it only applies new
> rule to the data until catch up with the Last-Processed-Old-Rules offset.
> Then it sends signal to the job1 and shutdown job1, and applies all rules
> to the stream.
>
> In terms of how to shutdown the job1, here is one solution
> <
> http://mail-archives.apache.org/mod_mbox/samza-dev/201407.mbox/%3ccfe93d17.2d24b%25criccom...@linkedin.com%3E
> >
> provided by Chris - e.g. you can have a control stream to get job1
> shutdown. Samza will provide this kind of stream after SAMZA-348
> , which is under active
> development.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Wed, Apr 15, 2015 at 12:17 PM, jeremy p  >
> wrote:
>
> > Hello Yan,
> >
> > Thank you for the suggestion!  I think your solution would work,
> however, I
> > am afraid it would create a performance problem for our users.
> >
> > Let's say we kill the Classifier task, and create a new Classifier task
> > with both the existing rules and new rules. We get the offset of the
> > latest-processed message for the old rules.  Let's call this offset
> > Last-Processed-Old-Rules.  We ignore messages
> > before Last-Processed-Old-Rules for the old rules.  We configure the new
> > Classifier task to be a bootstrap task.
> >
> > Let's say we have users who are watching the output topics, and they are
> > expecting near-realtime updates.  They won't see any updates for the old
> > rules until our task has passed the Last-Processed-Old-Rules offset.  If
> we
> > have a lot of messages in that topic, that could take a long time.  This
> is
> > why I was hoping there would be a way to bootstrap the new rules while
> > we're still processing the old rules.  Do you think there is a way to do
> > that?
> >
> > On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang  wrote:
> >
> > > Hi Jeremy,
> > >
> > > If my understanding is correct, whenever you add a new rule, you want
> to
> > > apply this rule to the historical data. Right?
> > >
> > > If you do not care about duplication, you can create a new task that
> > > contains existing rules and new rules. Configure bootstrap. This will
> > apply
> > > all the rules from the beginning of the input stream. The shortcoming
> is
> > > you will get duplicated results for old rules.
> > >
> > > If you can not tolerate the shortcoming, 1) get the offset of the
> > > latest-processed message of old rules. 2) In your new task, ignore
> > messages
> > > before that offset for the old rules. 3) bootstrap.
> > >
> > > Hope this helps. Maybe your use case is more complicated?
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> > athomewithagroove...@gmail.com
> > > >
> > > wrote:
> > >
> > > > So, I'm wanting to use Samza for a project I'm working on, but I keep
> > > > running into a problem with bootstrapping.
> > > >
> > > > Let's say there's a Kafka topic called Numbers that I want to consume
> > > with
> > > > Samza.  Let's say each message has a single integer in it, and I want
> > to
> > > > classify it as even or odd.  So I have two topics that I'm using for
> > > > output, one called Even and one called Odd.  I write a simple stream
> > task
> > > > called Classifier that consumes the Numbers topic, examines each
> > incoming
> > > > integer and writes it back out to Even or Odd.
> > > >
> > > > Now, let's say I want to be able to add classifications dynamically,
> > > like :
> > > > "divisible by three", "divisible by four", or "numbers that appear in
> > my
> > > > date of birth".  And let's say I have an API I can query that gives
> me
> > > all
> > > > the assignment rules, such as "when a number is divisble by 3, write
> it
> > > out
> > > > to a topic called 'divisible_by_three'", or "when a number appears in
> > the
> > > > string 12/12/1981, write it to the 'my_birthday' topic".  So now I
> > > rewrite
> > > > my stream task to query this API for assignment rules.  It reads
> > integers
> > > > from the Numbers topic and writes them back out to one or more output
> > > > top

Re: How to deal with bootstrapping

2015-04-15 Thread Yan Fang
Hi Jeremy,

In order to reach this goal, we have to assume that the job with new rules
can always catch up with the one with old rules. Otherwise, I think we do
not have the choice but running a lot of jobs simultaneously.

Under our assumption, we have job1 with old rules running, and now add job2
which integrates old rules and new rules to run. Job2 frequently
checks the Last-Processed-Old-Rules
offset from job1 (because job1 is running too), and it only applies new
rule to the data until catch up with the Last-Processed-Old-Rules offset.
Then it sends signal to the job1 and shutdown job1, and applies all rules
to the stream.

In terms of how to shutdown the job1, here is one solution

provided by Chris - e.g. you can have a control stream to get job1
shutdown. Samza will provide this kind of stream after SAMZA-348
, which is under active
development.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Apr 15, 2015 at 12:17 PM, jeremy p 
wrote:

> Hello Yan,
>
> Thank you for the suggestion!  I think your solution would work, however, I
> am afraid it would create a performance problem for our users.
>
> Let's say we kill the Classifier task, and create a new Classifier task
> with both the existing rules and new rules. We get the offset of the
> latest-processed message for the old rules.  Let's call this offset
> Last-Processed-Old-Rules.  We ignore messages
> before Last-Processed-Old-Rules for the old rules.  We configure the new
> Classifier task to be a bootstrap task.
>
> Let's say we have users who are watching the output topics, and they are
> expecting near-realtime updates.  They won't see any updates for the old
> rules until our task has passed the Last-Processed-Old-Rules offset.  If we
> have a lot of messages in that topic, that could take a long time.  This is
> why I was hoping there would be a way to bootstrap the new rules while
> we're still processing the old rules.  Do you think there is a way to do
> that?
>
> On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang  wrote:
>
> > Hi Jeremy,
> >
> > If my understanding is correct, whenever you add a new rule, you want to
> > apply this rule to the historical data. Right?
> >
> > If you do not care about duplication, you can create a new task that
> > contains existing rules and new rules. Configure bootstrap. This will
> apply
> > all the rules from the beginning of the input stream. The shortcoming is
> > you will get duplicated results for old rules.
> >
> > If you can not tolerate the shortcoming, 1) get the offset of the
> > latest-processed message of old rules. 2) In your new task, ignore
> messages
> > before that offset for the old rules. 3) bootstrap.
> >
> > Hope this helps. Maybe your use case is more complicated?
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> athomewithagroove...@gmail.com
> > >
> > wrote:
> >
> > > So, I'm wanting to use Samza for a project I'm working on, but I keep
> > > running into a problem with bootstrapping.
> > >
> > > Let's say there's a Kafka topic called Numbers that I want to consume
> > with
> > > Samza.  Let's say each message has a single integer in it, and I want
> to
> > > classify it as even or odd.  So I have two topics that I'm using for
> > > output, one called Even and one called Odd.  I write a simple stream
> task
> > > called Classifier that consumes the Numbers topic, examines each
> incoming
> > > integer and writes it back out to Even or Odd.
> > >
> > > Now, let's say I want to be able to add classifications dynamically,
> > like :
> > > "divisible by three", "divisible by four", or "numbers that appear in
> my
> > > date of birth".  And let's say I have an API I can query that gives me
> > all
> > > the assignment rules, such as "when a number is divisble by 3, write it
> > out
> > > to a topic called 'divisible_by_three'", or "when a number appears in
> the
> > > string 12/12/1981, write it to the 'my_birthday' topic".  So now I
> > rewrite
> > > my stream task to query this API for assignment rules.  It reads
> integers
> > > from the Numbers topic and writes them back out to one or more output
> > > topics, according to the assignment rules.
> > >
> > > Now, let's make this even more complicated.  When I add a new
> > > classification, I want to go back to the very beginning of the Numbers
> > > topic and classify them accordingly.  Once we've consumed all the old
> > > "historical" integers, I want to apply this classification new integers
> > as
> > > they come in.
> > >
> > > And this is where I get stuck.
> > >
> > > One thing I can do : when I want to add a new classification, I can
> > create
> > > a bootstrap job by setting the
> > > "systems.kafka.streams.numbers.samza.offset.default" property to
> > "oldest".
> > > And that's great, but the problem is, on

Re: How to deal with bootstrapping

2015-04-15 Thread Benjamin Black
What about this:

1) Add new rule to the classifier task
2) Take note of offset of the first message processed after restart
3) Run a job to process from offset 0 to the offset from #2, after which
the job is deleted

I don't know how to do 2 or 3, but perhaps some of the core Samza folk
could shed some light?


b



On Wed, Apr 15, 2015 at 12:17 PM, jeremy p 
wrote:

> Hello Yan,
>
> Thank you for the suggestion!  I think your solution would work, however, I
> am afraid it would create a performance problem for our users.
>
> Let's say we kill the Classifier task, and create a new Classifier task
> with both the existing rules and new rules. We get the offset of the
> latest-processed message for the old rules.  Let's call this offset
> Last-Processed-Old-Rules.  We ignore messages
> before Last-Processed-Old-Rules for the old rules.  We configure the new
> Classifier task to be a bootstrap task.
>
> Let's say we have users who are watching the output topics, and they are
> expecting near-realtime updates.  They won't see any updates for the old
> rules until our task has passed the Last-Processed-Old-Rules offset.  If we
> have a lot of messages in that topic, that could take a long time.  This is
> why I was hoping there would be a way to bootstrap the new rules while
> we're still processing the old rules.  Do you think there is a way to do
> that?
>
> On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang  wrote:
>
> > Hi Jeremy,
> >
> > If my understanding is correct, whenever you add a new rule, you want to
> > apply this rule to the historical data. Right?
> >
> > If you do not care about duplication, you can create a new task that
> > contains existing rules and new rules. Configure bootstrap. This will
> apply
> > all the rules from the beginning of the input stream. The shortcoming is
> > you will get duplicated results for old rules.
> >
> > If you can not tolerate the shortcoming, 1) get the offset of the
> > latest-processed message of old rules. 2) In your new task, ignore
> messages
> > before that offset for the old rules. 3) bootstrap.
> >
> > Hope this helps. Maybe your use case is more complicated?
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang...@gmail.com
> >
> > On Wed, Apr 15, 2015 at 11:19 AM, jeremy p <
> athomewithagroove...@gmail.com
> > >
> > wrote:
> >
> > > So, I'm wanting to use Samza for a project I'm working on, but I keep
> > > running into a problem with bootstrapping.
> > >
> > > Let's say there's a Kafka topic called Numbers that I want to consume
> > with
> > > Samza.  Let's say each message has a single integer in it, and I want
> to
> > > classify it as even or odd.  So I have two topics that I'm using for
> > > output, one called Even and one called Odd.  I write a simple stream
> task
> > > called Classifier that consumes the Numbers topic, examines each
> incoming
> > > integer and writes it back out to Even or Odd.
> > >
> > > Now, let's say I want to be able to add classifications dynamically,
> > like :
> > > "divisible by three", "divisible by four", or "numbers that appear in
> my
> > > date of birth".  And let's say I have an API I can query that gives me
> > all
> > > the assignment rules, such as "when a number is divisble by 3, write it
> > out
> > > to a topic called 'divisible_by_three'", or "when a number appears in
> the
> > > string 12/12/1981, write it to the 'my_birthday' topic".  So now I
> > rewrite
> > > my stream task to query this API for assignment rules.  It reads
> integers
> > > from the Numbers topic and writes them back out to one or more output
> > > topics, according to the assignment rules.
> > >
> > > Now, let's make this even more complicated.  When I add a new
> > > classification, I want to go back to the very beginning of the Numbers
> > > topic and classify them accordingly.  Once we've consumed all the old
> > > "historical" integers, I want to apply this classification new integers
> > as
> > > they come in.
> > >
> > > And this is where I get stuck.
> > >
> > > One thing I can do : when I want to add a new classification, I can
> > create
> > > a bootstrap job by setting the
> > > "systems.kafka.streams.numbers.samza.offset.default" property to
> > "oldest".
> > > And that's great, but the problem is, once I've "caught up", I'd like
> to
> > > kill the bootstrap job and just let the Classifier handle this new
> > > assignment.  So, I'd want to do some kind of handover from the
> bootstrap
> > > job to the Classifier job.  But how to do this?
> > >
> > > So, the question I must ask is this : Is Samza even an appopriate way
> to
> > > solve this problem?  Has this problem ever come up for anybody else?
> How
> > > have they solved it?  I would really like to use Samza because it seems
> > > like an appopriate technology, and I'd really really really really like
> > to
> > > avoid re-inventing the wheel.
> > >
> > > A couple solutions I came up with :
> > >
> > > 1) The simple solution.  Have a separate Samza job for each
> > > classificatio

Re: How to deal with bootstrapping

2015-04-15 Thread jeremy p
Hello Yan,

Thank you for the suggestion!  I think your solution would work, however, I
am afraid it would create a performance problem for our users.

Let's say we kill the Classifier task, and create a new Classifier task
with both the existing rules and new rules. We get the offset of the
latest-processed message for the old rules.  Let's call this offset
Last-Processed-Old-Rules.  We ignore messages
before Last-Processed-Old-Rules for the old rules.  We configure the new
Classifier task to be a bootstrap task.

Let's say we have users who are watching the output topics, and they are
expecting near-realtime updates.  They won't see any updates for the old
rules until our task has passed the Last-Processed-Old-Rules offset.  If we
have a lot of messages in that topic, that could take a long time.  This is
why I was hoping there would be a way to bootstrap the new rules while
we're still processing the old rules.  Do you think there is a way to do
that?

On Wed, Apr 15, 2015 at 2:56 PM, Yan Fang  wrote:

> Hi Jeremy,
>
> If my understanding is correct, whenever you add a new rule, you want to
> apply this rule to the historical data. Right?
>
> If you do not care about duplication, you can create a new task that
> contains existing rules and new rules. Configure bootstrap. This will apply
> all the rules from the beginning of the input stream. The shortcoming is
> you will get duplicated results for old rules.
>
> If you can not tolerate the shortcoming, 1) get the offset of the
> latest-processed message of old rules. 2) In your new task, ignore messages
> before that offset for the old rules. 3) bootstrap.
>
> Hope this helps. Maybe your use case is more complicated?
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Wed, Apr 15, 2015 at 11:19 AM, jeremy p  >
> wrote:
>
> > So, I'm wanting to use Samza for a project I'm working on, but I keep
> > running into a problem with bootstrapping.
> >
> > Let's say there's a Kafka topic called Numbers that I want to consume
> with
> > Samza.  Let's say each message has a single integer in it, and I want to
> > classify it as even or odd.  So I have two topics that I'm using for
> > output, one called Even and one called Odd.  I write a simple stream task
> > called Classifier that consumes the Numbers topic, examines each incoming
> > integer and writes it back out to Even or Odd.
> >
> > Now, let's say I want to be able to add classifications dynamically,
> like :
> > "divisible by three", "divisible by four", or "numbers that appear in my
> > date of birth".  And let's say I have an API I can query that gives me
> all
> > the assignment rules, such as "when a number is divisble by 3, write it
> out
> > to a topic called 'divisible_by_three'", or "when a number appears in the
> > string 12/12/1981, write it to the 'my_birthday' topic".  So now I
> rewrite
> > my stream task to query this API for assignment rules.  It reads integers
> > from the Numbers topic and writes them back out to one or more output
> > topics, according to the assignment rules.
> >
> > Now, let's make this even more complicated.  When I add a new
> > classification, I want to go back to the very beginning of the Numbers
> > topic and classify them accordingly.  Once we've consumed all the old
> > "historical" integers, I want to apply this classification new integers
> as
> > they come in.
> >
> > And this is where I get stuck.
> >
> > One thing I can do : when I want to add a new classification, I can
> create
> > a bootstrap job by setting the
> > "systems.kafka.streams.numbers.samza.offset.default" property to
> "oldest".
> > And that's great, but the problem is, once I've "caught up", I'd like to
> > kill the bootstrap job and just let the Classifier handle this new
> > assignment.  So, I'd want to do some kind of handover from the bootstrap
> > job to the Classifier job.  But how to do this?
> >
> > So, the question I must ask is this : Is Samza even an appopriate way to
> > solve this problem?  Has this problem ever come up for anybody else?  How
> > have they solved it?  I would really like to use Samza because it seems
> > like an appopriate technology, and I'd really really really really like
> to
> > avoid re-inventing the wheel.
> >
> > A couple solutions I came up with :
> >
> > 1) The simple solution.  Have a separate Samza job for each
> > classification.  If I want to add a new classification, I create a new
> job
> > and set it up as a bootstrap job.  This would solve the problem.
> However,
> > we may want to have many, many classifications.  It could be as many as
> > 1,000,000, which would mean up to 1,000,000 simultaneously running jobs.
> > This could create a lot of overhead for YARN and Kafka.
> >
> > 2) My overly-complicated workaround solution.  Each assignment rule has
> an
> > "isnew" flag.  If it's a new classification that hasn't fully
> bootstrapped
> > yet, the "isnew" flag is set to TRUE.  When my classifier queries the API
> > for assignment rules, it

Re: How to deal with bootstrapping

2015-04-15 Thread Yan Fang
Hi Jeremy,

If my understanding is correct, whenever you add a new rule, you want to
apply this rule to the historical data. Right?

If you do not care about duplication, you can create a new task that
contains existing rules and new rules. Configure bootstrap. This will apply
all the rules from the beginning of the input stream. The shortcoming is
you will get duplicated results for old rules.

If you can not tolerate the shortcoming, 1) get the offset of the
latest-processed message of old rules. 2) In your new task, ignore messages
before that offset for the old rules. 3) bootstrap.

Hope this helps. Maybe your use case is more complicated?

Thanks,

Fang, Yan
yanfang...@gmail.com

On Wed, Apr 15, 2015 at 11:19 AM, jeremy p 
wrote:

> So, I'm wanting to use Samza for a project I'm working on, but I keep
> running into a problem with bootstrapping.
>
> Let's say there's a Kafka topic called Numbers that I want to consume with
> Samza.  Let's say each message has a single integer in it, and I want to
> classify it as even or odd.  So I have two topics that I'm using for
> output, one called Even and one called Odd.  I write a simple stream task
> called Classifier that consumes the Numbers topic, examines each incoming
> integer and writes it back out to Even or Odd.
>
> Now, let's say I want to be able to add classifications dynamically, like :
> "divisible by three", "divisible by four", or "numbers that appear in my
> date of birth".  And let's say I have an API I can query that gives me all
> the assignment rules, such as "when a number is divisble by 3, write it out
> to a topic called 'divisible_by_three'", or "when a number appears in the
> string 12/12/1981, write it to the 'my_birthday' topic".  So now I rewrite
> my stream task to query this API for assignment rules.  It reads integers
> from the Numbers topic and writes them back out to one or more output
> topics, according to the assignment rules.
>
> Now, let's make this even more complicated.  When I add a new
> classification, I want to go back to the very beginning of the Numbers
> topic and classify them accordingly.  Once we've consumed all the old
> "historical" integers, I want to apply this classification new integers as
> they come in.
>
> And this is where I get stuck.
>
> One thing I can do : when I want to add a new classification, I can create
> a bootstrap job by setting the
> "systems.kafka.streams.numbers.samza.offset.default" property to "oldest".
> And that's great, but the problem is, once I've "caught up", I'd like to
> kill the bootstrap job and just let the Classifier handle this new
> assignment.  So, I'd want to do some kind of handover from the bootstrap
> job to the Classifier job.  But how to do this?
>
> So, the question I must ask is this : Is Samza even an appopriate way to
> solve this problem?  Has this problem ever come up for anybody else?  How
> have they solved it?  I would really like to use Samza because it seems
> like an appopriate technology, and I'd really really really really like to
> avoid re-inventing the wheel.
>
> A couple solutions I came up with :
>
> 1) The simple solution.  Have a separate Samza job for each
> classification.  If I want to add a new classification, I create a new job
> and set it up as a bootstrap job.  This would solve the problem.  However,
> we may want to have many, many classifications.  It could be as many as
> 1,000,000, which would mean up to 1,000,000 simultaneously running jobs.
> This could create a lot of overhead for YARN and Kafka.
>
> 2) My overly-complicated workaround solution.  Each assignment rule has an
> "isnew" flag.  If it's a new classification that hasn't fully bootstrapped
> yet, the "isnew" flag is set to TRUE.  When my classifier queries the API
> for assignment rules, it ignores any rule with an "isnew" flag.  When I
> want to add a new classification, I create a new bootstrap job for that
> classification.  Every so often, maybe every few days or so, if all of my
> bootstrap jobs have "caught up", I kill all of the bootstrap jobs and
> classifier jobs.  I set all the "isnew" flags to FALSE.  Then I restart the
> classifier job.  This is kind of an ugly solution, and I'm not even sure it
> would work.  For one thing, I'd need some way of knowing if a boostrap job
> has "caught up".  Secondly, I'd essentially be restarting the classifier
> job periodically, which just seems like an ugly solution.  I don't like it.
>
> 3) Some other kind of really complicated solution I haven't thought of yet,
> probably involving locks, transactions, concurrancy, and interprocess
> communication.
>
> Thanks for reading this whole thing.  Please let me know if you have any
> suggestions.
>