Cool, I agree with all that.

I agree about the need for a rebalancing callback.

Totally agree about record handler.

It would be great to see if a prototype of this is workable.

Thanks guys!

-Jay

On Wed, Feb 11, 2015 at 12:36 PM, Joel Koshy <jjkosh...@gmail.com> wrote:

> Hey Jay,
>
> Guozhang, Becket and I got together to discuss this and we think:
>
> - It seems that your proposal based on the new consumer and flush call
>   should work.
> - We would likely need to call the poll with a timeout that matches
>   the offset commit interval in order to deal with low volume
>   mirroring pipelines.
> - We will still need a rebalance callback to reduce duplicates - the
>   rebalance callback would need to flush and commit offsets.
> - The only remaining question is if the overall throughput is
>   sufficient. I think someone at LinkedIn (I don't remember who) did
>   some experiments with data channel size == 1 and ran into issues.
>   That was not thoroughly investigated though.
> - The addition of flush may actually make this solution viable for the
>   current mirror-maker (with the old consumer). We can prototype that
>   offline and if it works out well we can redo KAFKA-1650 (i.e.,
>   refactor the current mirror maker). The flush call and the new
>   consumer didn't exist at the time we did KAFKA-1650 so this did not
>   occur to us.
> - We think the RecordHandler is still a useful small addition for the
>   use-cases mentioned earlier in this thread.
>
> Thanks,
>
> Joel
>
> On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote:
> > Guozhang, I agree with 1-3, I do think what I was proposing was simpler
> but
> > perhaps there are gaps in that?
> >
> > Hey Joel--Here was a sketch of what I was proposing. I do think this
> get's
> > rid of manual offset tracking, especially doing so across threads with
> > dedicated commit threads, which I think is pretty complex.
> >
> > while(true) {
> >     val recs = consumer.poll(Long.MaxValue);
> >     for (rec <- recs)
> >         producer.send(rec, logErrorCallback)
> >     if(System.currentTimeMillis - lastCommit > commitInterval) {
> >         producer.flush()
> >         consumer.commit()
> >         lastCommit = System.currentTimeMillis
> >     }
> > }
> >
> > (See the previous email for details). I think the question is: is there
> any
> > reason--performance, correctness, etc--that this won't work? Basically I
> > think you guys have thought about this more so I may be missing
> something.
> > If so let's flag it while we still have leeway on the consumer.
> >
> > If we think that will work, well I do think it is conceptually a lot
> > simpler than the current code, though I suppose one could disagree on
> that.
> >
> > -Jay
> >
> > On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
> >
> > > Hi Jay,
> > >
> > > > The data channels are actually a big part of the complexity of the
> zero
> > > > data loss design, though, right? Because then you need some reverse
> > > channel
> > > > to flow the acks back to the consumer based on where you are versus
> just
> > > > acking what you have read and written (as in the code snippet I put
> up).
> > >
> > > I'm not sure if we are on the same page. Even if the data channel was
> > > not there the current handling for zero data loss would remain very
> > > similar - you would need to maintain lists of unacked source offsets.
> > > I'm wondering if the KIP needs more detail on how it is currently
> > > implemented; or are suggesting a different approach (in which case I
> > > have not fully understood). I'm not sure what you mean by flowing acks
> > > back to the consumer - the MM commits offsets after the producer ack
> > > has been received. There is some additional complexity introduced in
> > > reducing duplicates on a rebalance - this is actually optional (since
> > > duplicates are currently a given). The reason that was done anyway is
> > > that with the auto-commit turned off duplicates are almost guaranteed
> > > on a rebalance.
> > >
> > > > I think the point that Neha and I were trying to make was that the
> > > > motivation to embed stuff into MM kind of is related to how complex a
> > > > simple "consume and produce" with good throughput will be. If it is
> > > simple
> > > > to write such a thing in a few lines, the pain of embedding a bunch
> of
> > > > stuff won't be worth it, if it has to be as complex as the current mm
> > > then
> > > > of course we will need all kinds of plug ins because no one will be
> able
> > > to
> > > > write such a thing. I don't have a huge concern with a simple plug-in
> > > but I
> > > > think if it turns into something more complex with filtering and
> > > > aggregation or whatever we really need to stop and think a bit about
> the
> > > > design.
> > >
> > > I agree - I don't think there is a use-case for any complex plug-in.
> > > It is pretty much what Becket has described currently for the message
> > > handler - i.e., take an incoming record and return a list of outgoing
> > > records (which could be empty if you filter).
> > >
> > > So here is my take on the MM:
> > > - Bare bones: simple consumer - producer pairs (0.7 style). This is
> > >   ideal, but does not handle no data loss
> > > - Above plus support no data loss. This actually adds quite a bit of
> > >   complexity.
> > > - Above plus the message handler. This is a trivial addition I think
> > >   that makes the MM usable in a few other mirroring-like applications.
> > >
> > > Joel
> > >
> > > > On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy <jjkosh...@gmail.com>
> > > wrote:
> > > >
> > > > >
> > > > >
> > > > > On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede wrote:
> > > > > > I think all of us agree that we want to design MirrorMaker for 0
> data
> > > > > loss.
> > > > > > With the absence of the data channel, 0 data loss will be much
> > > simpler to
> > > > > > implement.
> > > > >
> > > > > The data channel is irrelevant to the implementation of zero data
> > > > > loss. The complexity in the implementation of no data loss that you
> > > > > are seeing in mirror-maker affects all consume-then-produce
> patterns
> > > > > whether or not there is a data channel.  You still need to
> maintain a
> > > > > list of unacked offsets. What I meant earlier is that we can
> > > > > brainstorm completely different approaches to supporting no data
> loss,
> > > > > but the current implementation is the only solution we are aware
> of.
> > > > >
> > > > > >
> > > > > > My arguments for adding a message handler are that:
> > > > > > > 1. It is more efficient to do something in common for all the
> > > clients
> > > > > in
> > > > > > > pipeline than letting each client do the same thing for many
> > > times. And
> > > > > > > there are concrete use cases for the message handler already.
> > > > > > >
> > > > > >
> > > > > > What are the concrete use cases?
> > > > >
> > > > > I think Becket already described a couple of use cases earlier in
> the
> > > > > thread.
> > > > >
> > > > > <quote>
> > > > >
> > > > > 1. Format conversion. We have a use case where clients of source
> > > > > cluster
> > > > > use an internal schema and clients of target cluster use a
> different
> > > > > public schema.
> > > > > 2. Message filtering: For the messages published to source cluster,
> > > > > there
> > > > > are some messages private to source cluster clients and should not
> > > > > exposed
> > > > > to target cluster clients. It would be difficult to publish those
> > > > > messages
> > > > > into different partitions because they need to be ordered.
> > > > > I agree that we can always filter/convert messages after they are
> > > > > copied
> > > > > to the target cluster, but that costs network bandwidth
> unnecessarily,
> > > > > especially if that is a cross colo mirror. With the handler, we can
> > > > > co-locate the mirror maker with source cluster and save that cost.
> > > > > Also,
> > > > > imagine there are many downstream consumers consuming from the
> target
> > > > > cluster, filtering/reformatting the messages before the messages
> reach
> > > > > the
> > > > > target cluster is much more efficient than having each of the
> > > > > consumers do
> > > > > this individually on their own.
> > > > >
> > > > > </quote>
> > > > >
> > > > > >
> > > > > > Also the KIP still refers to the data channel in a few places
> > > (Motivation
> > > > > > and "On consumer rebalance" sections). Can you update the wiki
> so it
> > > is
> > > > > > easier to review the new design, especially the data loss part.
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 10, 2015 at 10:36 AM, Joel Koshy <
> jjkosh...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > I think the message handler adds little to no complexity to the
> > > mirror
> > > > > > > maker. Jay/Neha, the MM became scary due to the rearchitecture
> we
> > > did
> > > > > > > for 0.8 due to performance issues compared with 0.7 - we should
> > > remove
> > > > > > > the data channel if it can match the current throughput. I
> agree
> > > it is
> > > > > > > worth prototyping and testing that so the MM architecture is
> > > > > > > simplified.
> > > > > > >
> > > > > > > The MM became a little scarier in KAFKA-1650 in order to
> support no
> > > > > > > data loss. I think the implementation for no data loss will
> remain
> > > > > > > about the same even in the new model (even without the data
> > > channel) -
> > > > > > > we can probably brainstorm more if there is a better/simpler
> way
> > > to do
> > > > > > > it (maybe there is in the absence of the data channel) but at
> the
> > > time
> > > > > > > it was the best we (i.e., Becket, myself, Jun and Guozhang who
> > > > > > > participated on the review) could come up with.
> > > > > > >
> > > > > > > So I'm definitely +1 on whatever it takes to support no data
> loss.
> > > I
> > > > > > > think most people would want that out of the box.
> > > > > > >
> > > > > > > As for the message handler, as Becket wrote and I agree with,
> it is
> > > > > > > really a trivial addition that would benefit (perhaps not most,
> > > but at
> > > > > > > least some). So I'm personally +1 on that as well. That said,
> I'm
> > > also
> > > > > > > okay with it not being there. I think the MM is fairly
> stand-alone
> > > and
> > > > > > > simple enough that it is entirely reasonable and absolutely
> > > feasible
> > > > > > > for companies to fork/re-implement the mirror maker for their
> own
> > > > > > > needs.
> > > > > > >
> > > > > > > So in summary, I'm +1 on the KIP.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Joel
> > > > > > >
> > > > > > > On Mon, Feb 09, 2015 at 09:19:57PM +0000, Jiangjie Qin wrote:
> > > > > > > > I just updated the KIP page and incorporated Jay and Neha’s
> > > > > suggestion.
> > > > > > > As
> > > > > > > > a brief summary of where we are:
> > > > > > > >
> > > > > > > > Consensus reached:
> > > > > > > > Have N independent mirror maker threads each has their own
> > > consumers
> > > > > but
> > > > > > > > share a producer. The mirror maker threads will be
> responsible
> > > for
> > > > > > > > decompression, compression and offset commit. No data
> channel and
> > > > > > > separate
> > > > > > > > offset commit thread is needed. Consumer rebalance callback
> will
> > > be
> > > > > used
> > > > > > > > to avoid duplicates on rebalance.
> > > > > > > >
> > > > > > > > Still under discussion:
> > > > > > > > Whether message handler is needed.
> > > > > > > >
> > > > > > > > My arguments for adding a message handler are that:
> > > > > > > > 1. It is more efficient to do something in common for all the
> > > > > clients in
> > > > > > > > pipeline than letting each client do the same thing for many
> > > times.
> > > > > And
> > > > > > > > there are concrete use cases for the message handler already.
> > > > > > > > 2. It is not a big complicated add-on to mirror maker.
> > > > > > > > 3. Without a message handler, for customers needs it, they
> have
> > > to
> > > > > > > > re-implement all the logics of mirror maker by themselves
> just in
> > > > > order
> > > > > > > to
> > > > > > > > add this handling in pipeline.
> > > > > > > >
> > > > > > > > Any thoughts?
> > > > > > > >
> > > > > > > > Thanks.
> > > > > > > >
> > > > > > > > ―Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > > > On 2/8/15, 6:35 PM, "Jiangjie Qin" <j...@linkedin.com>
> wrote:
> > > > > > > >
> > > > > > > > >Hi Jay, thanks a lot for the comments.
> > > > > > > > >I think this solution is better. We probably don’t need data
> > > channel
> > > > > > > > >anymore. It can be replaced with a list of producer if we
> need
> > > more
> > > > > > > sender
> > > > > > > > >thread.
> > > > > > > > >I’ll update the KIP page.
> > > > > > > > >
> > > > > > > > >The reasoning about message handler is mainly for efficiency
> > > > > purpose.
> > > > > > > I’m
> > > > > > > > >thinking that if something can be done in pipeline for all
> the
> > > > > clients
> > > > > > > > >such as filtering/reformatting, it is probably better to do
> it
> > > in
> > > > > the
> > > > > > > > >pipeline than asking 100 clients do the same thing for 100
> > > times.
> > > > > > > > >
> > > > > > > > >―Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >On 2/8/15, 4:59 PM, "Jay Kreps" <jay.kr...@gmail.com>
> wrote:
> > > > > > > > >
> > > > > > > > >>Yeah, I second Neha's comments. The current mm code has
> taken
> > > > > something
> > > > > > > > >>pretty simple and made it pretty scary with callbacks and
> > > > > wait/notify
> > > > > > > > >>stuff. Do we believe this works? I can't tell by looking
> at it
> > > > > which is
> > > > > > > > >>kind of bad for something important like this. I don't mean
> > > this as
> > > > > > > > >>criticism, I know the history: we added in memory queues to
> > > help
> > > > > with
> > > > > > > > >>other
> > > > > > > > >>performance problems without thinking about correctness,
> then
> > > we
> > > > > added
> > > > > > > > >>stuff to work around the in-memory queues not lose data,
> and
> > > so on.
> > > > > > > > >>
> > > > > > > > >>Can we instead do the opposite exercise and start with the
> > > basics
> > > > > of
> > > > > > > what
> > > > > > > > >>mm should do and think about what deficiencies prevents
> this
> > > > > approach
> > > > > > > > >>from
> > > > > > > > >>working? Then let's make sure the currently in-flight work
> will
> > > > > remove
> > > > > > > > >>these deficiencies. After all mm is kind of the
> prototypical
> > > kafka
> > > > > use
> > > > > > > > >>case
> > > > > > > > >>so if we can't make our clients to this probably no one
> else
> > > can.
> > > > > > > > >>
> > > > > > > > >>I think mm should just be N independent threads each of
> which
> > > has
> > > > > their
> > > > > > > > >>own
> > > > > > > > >>consumer but share a producer and each of which looks like
> > > this:
> > > > > > > > >>
> > > > > > > > >>while(true) {
> > > > > > > > >>    val recs = consumer.poll(Long.MaxValue);
> > > > > > > > >>    for (rec <- recs)
> > > > > > > > >>        producer.send(rec, logErrorCallback)
> > > > > > > > >>    if(System.currentTimeMillis - lastCommit >
> commitInterval)
> > > {
> > > > > > > > >>        producer.flush()
> > > > > > > > >>        consumer.commit()
> > > > > > > > >>        lastCommit = System.currentTimeMillis
> > > > > > > > >>    }
> > > > > > > > >>}
> > > > > > > > >>
> > > > > > > > >>This will depend on setting the retry count in the
> producer to
> > > > > > > something
> > > > > > > > >>high with a largish backoff so that a failed send attempt
> > > doesn't
> > > > > drop
> > > > > > > > >>data.
> > > > > > > > >>
> > > > > > > > >>We will need to use the callback to force a flush and
> offset
> > > > > commit on
> > > > > > > > >>rebalance.
> > > > > > > > >>
> > > > > > > > >>This approach may have a few more TCP connections due to
> using
> > > > > multiple
> > > > > > > > >>consumers but I think it is a lot easier to reason about
> and
> > > the
> > > > > total
> > > > > > > > >>number of mm instances is always going to be small.
> > > > > > > > >>
> > > > > > > > >>Let's talk about where this simple approach falls short, I
> > > think
> > > > > that
> > > > > > > > >>will
> > > > > > > > >>help us understand your motivations for additional
> elements.
> > > > > > > > >>
> > > > > > > > >>Another advantage of this is that it is so simple I don't
> > > think we
> > > > > > > really
> > > > > > > > >>even need to both making mm extensible because writing
> your own
> > > > > code
> > > > > > > that
> > > > > > > > >>does custom processing or transformation is just ten lines
> and
> > > no
> > > > > plug
> > > > > > > in
> > > > > > > > >>system is going to make it simpler.
> > > > > > > > >>
> > > > > > > > >>-Jay
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>On Sun, Feb 8, 2015 at 2:40 PM, Neha Narkhede <
> > > n...@confluent.io>
> > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >>> Few comments -
> > > > > > > > >>>
> > > > > > > > >>> 1. Why do we need the message handler? Do you have
> concrete
> > > use
> > > > > cases
> > > > > > > > >>>in
> > > > > > > > >>> mind? If not, we should consider adding it in the future
> > > when/if
> > > > > we
> > > > > > > do
> > > > > > > > >>>have
> > > > > > > > >>> use cases for it. The purpose of the mirror maker is a
> simple
> > > > > tool
> > > > > > > for
> > > > > > > > >>> setting up Kafka cluster replicas. I don't see why we
> need to
> > > > > > > include a
> > > > > > > > >>> message handler for doing stream transformations or
> > > filtering.
> > > > > You
> > > > > > > can
> > > > > > > > >>> always write a simple process for doing that once the
> data is
> > > > > copied
> > > > > > > as
> > > > > > > > >>>is
> > > > > > > > >>> in the target cluster
> > > > > > > > >>> 2. Why keep both designs? We should prefer the simpler
> design
> > > > > unless
> > > > > > > it
> > > > > > > > >>>is
> > > > > > > > >>> not feasible due to the performance issue that we
> previously
> > > > > had. Did
> > > > > > > > >>>you
> > > > > > > > >>> get a chance to run some tests to see if that is really
> > > still a
> > > > > > > problem
> > > > > > > > >>>or
> > > > > > > > >>> not? It will be easier to think about the design and also
> > > make
> > > > > the
> > > > > > > KIP
> > > > > > > > >>> complete if we make a call on the design first.
> > > > > > > > >>> 3. Can you explain the need for keeping a list of unacked
> > > > > offsets per
> > > > > > > > >>> partition? Consider adding a section on retries and how
> you
> > > plan
> > > > > to
> > > > > > > > >>>handle
> > > > > > > > >>> the case when the producer runs out of all retries.
> > > > > > > > >>>
> > > > > > > > >>> Thanks,
> > > > > > > > >>> Neha
> > > > > > > > >>>
> > > > > > > > >>> On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin
> > > > > > > > >>><j...@linkedin.com.invalid>
> > > > > > > > >>> wrote:
> > > > > > > > >>>
> > > > > > > > >>> > Hi Neha,
> > > > > > > > >>> >
> > > > > > > > >>> > Yes, I’ve updated the KIP so the entire KIP is based
> on new
> > > > > > > consumer
> > > > > > > > >>>now.
> > > > > > > > >>> > I’ve put both designs with and without data channel in
> the
> > > KIP
> > > > > as I
> > > > > > > > >>>still
> > > > > > > > >>> > feel we might need the data channel to provide more
> > > > > flexibility,
> > > > > > > > >>> > especially after message handler is introduced. I’ve
> put my
> > > > > > > thinking
> > > > > > > > >>>of
> > > > > > > > >>> > the pros and cons of the two designs in the KIP as
> well.
> > > It’ll
> > > > > be
> > > > > > > > >>>great
> > > > > > > > >>> if
> > > > > > > > >>> > you can give a review and comment.
> > > > > > > > >>> >
> > > > > > > > >>> > Thanks.
> > > > > > > > >>> >
> > > > > > > > >>> > Jiangjie (Becket) Qin
> > > > > > > > >>> >
> > > > > > > > >>> > On 2/6/15, 7:30 PM, "Neha Narkhede" <n...@confluent.io
> >
> > > wrote:
> > > > > > > > >>> >
> > > > > > > > >>> > >Hey Becket,
> > > > > > > > >>> > >
> > > > > > > > >>> > >What are the next steps on this KIP. As per your
> comment
> > > > > earlier
> > > > > > > on
> > > > > > > > >>>the
> > > > > > > > >>> > >thread -
> > > > > > > > >>> > >
> > > > > > > > >>> > >I do agree it makes more sense
> > > > > > > > >>> > >> to avoid duplicate effort and plan based on new
> > > consumer.
> > > > > I’ll
> > > > > > > > >>>modify
> > > > > > > > >>> > >>the
> > > > > > > > >>> > >> KIP.
> > > > > > > > >>> > >
> > > > > > > > >>> > >
> > > > > > > > >>> > >Did you get a chance to think about the simplified
> design
> > > > > that we
> > > > > > > > >>> proposed
> > > > > > > > >>> > >earlier? Do you plan to update the KIP with that
> proposal?
> > > > > > > > >>> > >
> > > > > > > > >>> > >Thanks,
> > > > > > > > >>> > >Neha
> > > > > > > > >>> > >
> > > > > > > > >>> > >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin
> > > > > > > > >>><j...@linkedin.com.invalid
> > > > > > > > >>> >
> > > > > > > > >>> > >wrote:
> > > > > > > > >>> > >
> > > > > > > > >>> > >> In mirror maker we do not do de-serialization on the
> > > > > messages.
> > > > > > > > >>>Mirror
> > > > > > > > >>> > >> maker use source TopicPartition hash to chose a
> > > producer to
> > > > > send
> > > > > > > > >>> > >>messages
> > > > > > > > >>> > >> from the same source partition. The partition those
> > > > > messages end
> > > > > > > > >>>up
> > > > > > > > >>> with
> > > > > > > > >>> > >> are decided by Partitioner class in KafkaProducer
> > > (assuming
> > > > > you
> > > > > > > > >>>are
> > > > > > > > >>> > >>using
> > > > > > > > >>> > >> the new producer), which uses hash code of bytes[].
> > > > > > > > >>> > >>
> > > > > > > > >>> > >> If deserialization is needed, it has to be done in
> > > message
> > > > > > > > >>>handler.
> > > > > > > > >>> > >>
> > > > > > > > >>> > >> Thanks.
> > > > > > > > >>> > >>
> > > > > > > > >>> > >> Jiangjie (Becket) Qin
> > > > > > > > >>> > >>
> > > > > > > > >>> > >> On 2/4/15, 11:33 AM, "Bhavesh Mistry" <
> > > > > > > mistry.p.bhav...@gmail.com>
> > > > > > > > >>> > >>wrote:
> > > > > > > > >>> > >>
> > > > > > > > >>> > >> >Hi Jiangjie,
> > > > > > > > >>> > >> >
> > > > > > > > >>> > >> >Thanks for entertaining my question so far.  Last
> > > > > question, I
> > > > > > > > >>>have is
> > > > > > > > >>> > >> >about
> > > > > > > > >>> > >> >serialization of message key.  If the key
> > > de-serialization
> > > > > > > > >>>(Class) is
> > > > > > > > >>> > >>not
> > > > > > > > >>> > >> >present at the MM instance, then does it use raw
> byte
> > > > > hashcode
> > > > > > > to
> > > > > > > > >>> > >> >determine
> > > > > > > > >>> > >> >the partition ?  How are you going to address the
> > > situation
> > > > > > > where
> > > > > > > > >>>key
> > > > > > > > >>> > >> >needs
> > > > > > > > >>> > >> >to be de-serialization and get actual hashcode
> needs
> > > to be
> > > > > > > > >>>computed
> > > > > > > > >>> ?.
> > > > > > > > >>> > >> >
> > > > > > > > >>> > >> >
> > > > > > > > >>> > >> >Thanks,
> > > > > > > > >>> > >> >
> > > > > > > > >>> > >> >Bhavesh
> > > > > > > > >>> > >> >
> > > > > > > > >>> > >> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin
> > > > > > > > >>> > >><j...@linkedin.com.invalid>
> > > > > > > > >>> > >> >wrote:
> > > > > > > > >>> > >> >
> > > > > > > > >>> > >> >> Hi Bhavesh,
> > > > > > > > >>> > >> >>
> > > > > > > > >>> > >> >> Please see inline comments.
> > > > > > > > >>> > >> >>
> > > > > > > > >>> > >> >> Jiangjie (Becket) Qin
> > > > > > > > >>> > >> >>
> > > > > > > > >>> > >> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry"
> > > > > > > > >>><mistry.p.bhav...@gmail.com>
> > > > > > > > >>> > >> >>wrote:
> > > > > > > > >>> > >> >>
> > > > > > > > >>> > >> >> >Hi Jiangjie,
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >Thanks for the input.
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >a) Is MM will  producer ack will be attach to
> > > Producer
> > > > > > > > >>>Instance or
> > > > > > > > >>> > >>per
> > > > > > > > >>> > >> >> >topic.  Use case is that one instance of MM
> > > > > > > > >>> > >> >> >needs to handle both strong ack and also ack=0
> for
> > > some
> > > > > > > topic.
> > > > > > > > >>> Or
> > > > > > > > >>> > >>it
> > > > > > > > >>> > >> >> >would
> > > > > > > > >>> > >> >> >be better to set-up another instance of MM.
> > > > > > > > >>> > >> >> The acks setting is producer level setting
> instead of
> > > > > topic
> > > > > > > > >>>level
> > > > > > > > >>> > >> >>setting.
> > > > > > > > >>> > >> >> In this case you probably need to set up another
> > > > > instance.
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >b) Regarding TCP connections, Why does #producer
> > > > > instance
> > > > > > > > >>>attach
> > > > > > > > >>> to
> > > > > > > > >>> > >>TCP
> > > > > > > > >>> > >> >> >connection.  Is it possible to use Broker
> > > Connection TCP
> > > > > > > Pool,
> > > > > > > > >>> > >>producer
> > > > > > > > >>> > >> >> >will just checkout TCP connection  to Broker.
> So,
> > > # of
> > > > > > > > >>>Producer
> > > > > > > > >>> > >> >>Instance
> > > > > > > > >>> > >> >> >does not correlation to Brokers Connection.  Is
> this
> > > > > > > possible
> > > > > > > > >>>?
> > > > > > > > >>> > >> >> In new producer, each producer maintains a
> > > connection to
> > > > > each
> > > > > > > > >>> broker
> > > > > > > > >>> > >> >> within the producer instance. Making producer
> > > instances
> > > > > to
> > > > > > > > >>>share
> > > > > > > > >>> the
> > > > > > > > >>> > >>TCP
> > > > > > > > >>> > >> >> connections is a very big change to the current
> > > design,
> > > > > so I
> > > > > > > > >>> suppose
> > > > > > > > >>> > >>we
> > > > > > > > >>> > >> >> won’t be able to do that.
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >Thanks,
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >Bhavesh
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin
> > > > > > > > >>> > >> >><j...@linkedin.com.invalid
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >wrote:
> > > > > > > > >>> > >> >> >
> > > > > > > > >>> > >> >> >> Hi Bhavesh,
> > > > > > > > >>> > >> >> >>
> > > > > > > > >>> > >> >> >> I think it is the right discussion to have
> when
> > > we are
> > > > > > > > >>>talking
> > > > > > > > >>> > >>about
> > > > > > > > >>> > >> >>the
> > > > > > > > >>> > >> >> >> new new design for MM.
> > > > > > > > >>> > >> >> >> Please see the inline comments.
> > > > > > > > >>> > >> >> >>
> > > > > > > > >>> > >> >> >> Jiangjie (Becket) Qin
> > > > > > > > >>> > >> >> >>
> > > > > > > > >>> > >> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry"
> > > > > > > > >>> > >><mistry.p.bhav...@gmail.com>
> > > > > > > > >>> > >> >> >>wrote:
> > > > > > > > >>> > >> >> >>
> > > > > > > > >>> > >> >> >> >Hi Jiangjie,
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >I just wanted to let you know about our use
> case
> > > and
> > > > > > > stress
> > > > > > > > >>>the
> > > > > > > > >>> > >> >>point
> > > > > > > > >>> > >> >> >>that
> > > > > > > > >>> > >> >> >> >local data center broker cluster have fewer
> > > > > partitions
> > > > > > > than
> > > > > > > > >>>the
> > > > > > > > >>> > >> >> >> >destination
> > > > > > > > >>> > >> >> >> >offline broker cluster. Just because we do
> the
> > > batch
> > > > > pull
> > > > > > > > >>>from
> > > > > > > > >>> > >>CAMUS
> > > > > > > > >>> > >> >> >>and
> > > > > > > > >>> > >> >> >> >in
> > > > > > > > >>> > >> >> >> >order to drain data faster than the injection
> > > rate
> > > > > (from
> > > > > > > > >>>four
> > > > > > > > >>> DCs
> > > > > > > > >>> > >> >>for
> > > > > > > > >>> > >> >> >>same
> > > > > > > > >>> > >> >> >> >topic).
> > > > > > > > >>> > >> >> >> Keeping the same partition number in source
> and
> > > target
> > > > > > > > >>>cluster
> > > > > > > > >>> > >>will
> > > > > > > > >>> > >> >>be
> > > > > > > > >>> > >> >> >>an
> > > > > > > > >>> > >> >> >> option but will not be enforced by default.
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >We are facing following issues (probably due
> to
> > > > > > > > >>>configuration):
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >1)      We occasionally loose data due to
> message
> > > > > batch
> > > > > > > > >>>size is
> > > > > > > > >>> > >>too
> > > > > > > > >>> > >> >> >>large
> > > > > > > > >>> > >> >> >> >(2MB) on target data (we are using old
> producer
> > > but I
> > > > > > > think
> > > > > > > > >>>new
> > > > > > > > >>> > >> >> >>producer
> > > > > > > > >>> > >> >> >> >will solve this problem to some extend).
> > > > > > > > >>> > >> >> >> We do see this issue in LinkedIn as well. New
> > > producer
> > > > > > > also
> > > > > > > > >>> might
> > > > > > > > >>> > >> >>have
> > > > > > > > >>> > >> >> >> this issue. There are some proposal of
> solutions,
> > > but
> > > > > no
> > > > > > > > >>>real
> > > > > > > > >>> work
> > > > > > > > >>> > >> >> >>started
> > > > > > > > >>> > >> >> >> yet. For now, as a workaround, setting a more
> > > > > aggressive
> > > > > > > > >>>batch
> > > > > > > > >>> > >>size
> > > > > > > > >>> > >> >>on
> > > > > > > > >>> > >> >> >> producer side should work.
> > > > > > > > >>> > >> >> >> >2)      Since only one instance is set to MM
> > > data,
> > > > > we
> > > > > > > are
> > > > > > > > >>>not
> > > > > > > > >>> > >>able
> > > > > > > > >>> > >> >>to
> > > > > > > > >>> > >> >> >> >set-up ack per topic instead ack is attached
> to
> > > > > producer
> > > > > > > > >>> > >>instance.
> > > > > > > > >>> > >> >> >> I don’t quite get the question here.
> > > > > > > > >>> > >> >> >> >3)      How are you going to address two
> phase
> > > commit
> > > > > > > > >>>problem
> > > > > > > > >>> if
> > > > > > > > >>> > >> >>ack is
> > > > > > > > >>> > >> >> >> >set
> > > > > > > > >>> > >> >> >> >to strongest, but auto commit is on for
> consumer
> > > > > (meaning
> > > > > > > > >>> > >>producer
> > > > > > > > >>> > >> >>does
> > > > > > > > >>> > >> >> >> >not
> > > > > > > > >>> > >> >> >> >get ack,  but consumer auto committed offset
> that
> > > > > > > message).
> > > > > > > > >>> Is
> > > > > > > > >>> > >> >>there
> > > > > > > > >>> > >> >> >> >transactional (Kafka transaction is in
> process)
> > > > > based ack
> > > > > > > > >>>and
> > > > > > > > >>> > >>commit
> > > > > > > > >>> > >> >> >> >offset
> > > > > > > > >>> > >> >> >> >?
> > > > > > > > >>> > >> >> >> Auto offset commit should be turned off in
> this
> > > case.
> > > > > The
> > > > > > > > >>>offset
> > > > > > > > >>> > >>will
> > > > > > > > >>> > >> >> >>only
> > > > > > > > >>> > >> >> >> be committed once by the offset commit
> thread. So
> > > > > there is
> > > > > > > > >>>no
> > > > > > > > >>> two
> > > > > > > > >>> > >> >>phase
> > > > > > > > >>> > >> >> >> commit.
> > > > > > > > >>> > >> >> >> >4)      How are you planning to avoid
> duplicated
> > > > > message?
> > > > > > > > >>>( Is
> > > > > > > > >>> > >> >> >> >brokergoing
> > > > > > > > >>> > >> >> >> >have moving window of message collected and
> > > de-dupe
> > > > > ?)
> > > > > > > > >>> > >>Possibly, we
> > > > > > > > >>> > >> >> >>get
> > > > > > > > >>> > >> >> >> >this from retry set to 5…?
> > > > > > > > >>> > >> >> >> We are not trying to completely avoid
> duplicates.
> > > The
> > > > > > > > >>>duplicates
> > > > > > > > >>> > >>will
> > > > > > > > >>> > >> >> >> still be there if:
> > > > > > > > >>> > >> >> >> 1. Producer retries on failure.
> > > > > > > > >>> > >> >> >> 2. Mirror maker is hard killed.
> > > > > > > > >>> > >> >> >> Currently, dedup is expected to be done by
> user if
> > > > > > > > >>>necessary.
> > > > > > > > >>> > >> >> >> >5)      Last, is there any warning or any
> thing
> > > you
> > > > > can
> > > > > > > > >>>provide
> > > > > > > > >>> > >> >>insight
> > > > > > > > >>> > >> >> >> >from MM component about data injection rate
> into
> > > > > > > > >>>destination
> > > > > > > > >>> > >> >> >>partitions is
> > > > > > > > >>> > >> >> >> >NOT evenly distributed regardless  of  keyed
> or
> > > > > non-keyed
> > > > > > > > >>> message
> > > > > > > > >>> > >> >> >>(Hence
> > > > > > > > >>> > >> >> >> >there is ripple effect such as data not
> arriving
> > > > > late, or
> > > > > > > > >>>data
> > > > > > > > >>> is
> > > > > > > > >>> > >> >> >>arriving
> > > > > > > > >>> > >> >> >> >out of order in  intern of time stamp  and
> early
> > > some
> > > > > > > time,
> > > > > > > > >>>and
> > > > > > > > >>> > >> >>CAMUS
> > > > > > > > >>> > >> >> >> >creates huge number of file count on HDFS
> due to
> > > > > uneven
> > > > > > > > >>> injection
> > > > > > > > >>> > >> >>rate
> > > > > > > > >>> > >> >> >>.
> > > > > > > > >>> > >> >> >> >Camus Job is  configured to run every 3
> minutes.)
> > > > > > > > >>> > >> >> >> I think uneven data distribution is typically
> > > caused
> > > > > by
> > > > > > > > >>>server
> > > > > > > > >>> > >>side
> > > > > > > > >>> > >> >> >> unbalance, instead of something mirror maker
> could
> > > > > > > control.
> > > > > > > > >>>In
> > > > > > > > >>> new
> > > > > > > > >>> > >> >> >>mirror
> > > > > > > > >>> > >> >> >> maker, however, there is a customizable
> message
> > > > > handler,
> > > > > > > > >>>that
> > > > > > > > >>> > >>might
> > > > > > > > >>> > >> >>be
> > > > > > > > >>> > >> >> >> able to help a little bit. In message handler,
> > > you can
> > > > > > > > >>> explicitly
> > > > > > > > >>> > >> >>set a
> > > > > > > > >>> > >> >> >> partition that you want to produce the message
> > > to. So
> > > > > if
> > > > > > > you
> > > > > > > > >>> know
> > > > > > > > >>> > >>the
> > > > > > > > >>> > >> >> >> uneven data distribution in target cluster,
> you
> > > may
> > > > > offset
> > > > > > > > >>>it
> > > > > > > > >>> > >>here.
> > > > > > > > >>> > >> >>But
> > > > > > > > >>> > >> >> >> that probably only works for non-keyed
> messages.
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >I am not sure if this is right discussion
> form to
> > > > > bring
> > > > > > > > >>>these
> > > > > > > > >>> to
> > > > > > > > >>> > >> >> >> >your/kafka
> > > > > > > > >>> > >> >> >> >Dev team attention.  This might be off track,
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >Thanks,
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >Bhavesh
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie
> Qin
> > > > > > > > >>> > >> >> >><j...@linkedin.com.invalid
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >wrote:
> > > > > > > > >>> > >> >> >> >
> > > > > > > > >>> > >> >> >> >> I’ve updated the KIP page. Feedbacks are
> > > welcome.
> > > > > > > > >>> > >> >> >> >>
> > > > > > > > >>> > >> >> >> >> Regarding the simple mirror maker design. I
> > > thought
> > > > > > > over
> > > > > > > > >>>it
> > > > > > > > >>> and
> > > > > > > > >>> > >> >>have
> > > > > > > > >>> > >> >> >> >>some
> > > > > > > > >>> > >> >> >> >> worries:
> > > > > > > > >>> > >> >> >> >> There are two things that might worth
> thinking:
> > > > > > > > >>> > >> >> >> >> 1. One of the enhancement to mirror maker
> is
> > > > > adding a
> > > > > > > > >>>message
> > > > > > > > >>> > >> >> >>handler to
> > > > > > > > >>> > >> >> >> >> do things like reformatting. I think we
> might
> > > > > > > potentially
> > > > > > > > >>> want
> > > > > > > > >>> > >>to
> > > > > > > > >>> > >> >> >>have
> > > > > > > > >>> > >> >> >> >> more threads processing the messages than
> the
> > > > > number of
> > > > > > > > >>> > >>consumers.
> > > > > > > > >>> > >> >> >>If we
> > > > > > > > >>> > >> >> >> >> follow the simple mirror maker solution, we
> > > lose
> > > > > this
> > > > > > > > >>> > >>flexibility.
> > > > > > > > >>> > >> >> >> >> 2. This might not matter too much, but
> creating
> > > > > more
> > > > > > > > >>> consumers
> > > > > > > > >>> > >> >>means
> > > > > > > > >>> > >> >> >> >>more
> > > > > > > > >>> > >> >> >> >> footprint of TCP connection / memory.
> > > > > > > > >>> > >> >> >> >>
> > > > > > > > >>> > >> >> >> >> Any thoughts on this?
> > > > > > > > >>> > >> >> >> >>
> > > > > > > > >>> > >> >> >> >> Thanks.
> > > > > > > > >>> > >> >> >> >>
> > > > > > > > >>> > >> >> >> >> Jiangjie (Becket) Qin
> > > > > > > > >>> > >> >> >> >>
> > > > > > > > >>> > >> >> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" <
> > > > > > > j...@linkedin.com>
> > > > > > > > >>> > wrote:
> > > > > > > > >>> > >> >> >> >>
> > > > > > > > >>> > >> >> >> >> >Hi Jay and Neha,
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >> >Thanks a lot for the reply and
> explanation. I
> > > do
> > > > > agree
> > > > > > > > >>>it
> > > > > > > > >>> > >>makes
> > > > > > > > >>> > >> >>more
> > > > > > > > >>> > >> >> >> >>sense
> > > > > > > > >>> > >> >> >> >> >to avoid duplicate effort and plan based
> on
> > > new
> > > > > > > > >>>consumer.
> > > > > > > > >>> I’ll
> > > > > > > > >>> > >> >> >>modify
> > > > > > > > >>> > >> >> >> >>the
> > > > > > > > >>> > >> >> >> >> >KIP.
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >> >To Jay’s question on message ordering -
> The
> > > data
> > > > > > > channel
> > > > > > > > >>> > >> >>selection
> > > > > > > > >>> > >> >> >> >>makes
> > > > > > > > >>> > >> >> >> >> >sure that the messages from the same
> source
> > > > > partition
> > > > > > > > >>>will
> > > > > > > > >>> > >>sent
> > > > > > > > >>> > >> >>by
> > > > > > > > >>> > >> >> >>the
> > > > > > > > >>> > >> >> >> >> >same producer. So the order of the
> messages is
> > > > > > > > >>>guaranteed
> > > > > > > > >>> with
> > > > > > > > >>> > >> >> >>proper
> > > > > > > > >>> > >> >> >> >> >producer settings
> > > > > > > > >>> > >> >>(MaxInFlightRequests=1,retries=Integer.MaxValue,
> > > > > > > > >>> > >> >> >> >>etc.)
> > > > > > > > >>> > >> >> >> >> >For keyed messages, because they come
> from the
> > > > > same
> > > > > > > > >>>source
> > > > > > > > >>> > >> >>partition
> > > > > > > > >>> > >> >> >> >>and
> > > > > > > > >>> > >> >> >> >> >will end up in the same target partition,
> as
> > > long
> > > > > as
> > > > > > > > >>>they
> > > > > > > > >>> are
> > > > > > > > >>> > >> >>sent
> > > > > > > > >>> > >> >> >>by
> > > > > > > > >>> > >> >> >> >>the
> > > > > > > > >>> > >> >> >> >> >same producer, the order is guaranteed.
> > > > > > > > >>> > >> >> >> >> >For non-keyed messages, the messages
> coming
> > > from
> > > > > the
> > > > > > > > >>>same
> > > > > > > > >>> > >>source
> > > > > > > > >>> > >> >> >> >>partition
> > > > > > > > >>> > >> >> >> >> >might go to different target partitions.
> The
> > > > > order is
> > > > > > > > >>>only
> > > > > > > > >>> > >> >> >>guaranteed
> > > > > > > > >>> > >> >> >> >> >within each partition.
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >> >Anyway, I’ll modify the KIP and data
> channel
> > > will
> > > > > be
> > > > > > > > >>>away.
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >> >Thanks.
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >> >Jiangjie (Becket) Qin
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" <
> > > > > > > n...@confluent.io>
> > > > > > > > >>> > >>wrote:
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >> >>I think there is some value in
> investigating
> > > if
> > > > > we
> > > > > > > can
> > > > > > > > >>>go
> > > > > > > > >>> > >>back
> > > > > > > > >>> > >> >>to
> > > > > > > > >>> > >> >> >>the
> > > > > > > > >>> > >> >> >> >> >>simple mirror maker design, as Jay points
> > > out.
> > > > > Here
> > > > > > > you
> > > > > > > > >>> have
> > > > > > > > >>> > >>N
> > > > > > > > >>> > >> >> >> >>threads,
> > > > > > > > >>> > >> >> >> >> >>each has a consumer and a producer.
> > > > > > > > >>> > >> >> >> >> >>
> > > > > > > > >>> > >> >> >> >> >>The reason why we had to move away from
> that
> > > was
> > > > > a
> > > > > > > > >>> > >>combination
> > > > > > > > >>> > >> >>of
> > > > > > > > >>> > >> >> >>the
> > > > > > > > >>> > >> >> >> >> >>difference in throughput between the
> consumer
> > > > > and the
> > > > > > > > >>>old
> > > > > > > > >>> > >> >>producer
> > > > > > > > >>> > >> >> >>and
> > > > > > > > >>> > >> >> >> >> >>the
> > > > > > > > >>> > >> >> >> >> >>deficiency of the consumer rebalancing
> that
> > > > > limits
> > > > > > > the
> > > > > > > > >>> total
> > > > > > > > >>> > >> >> >>number of
> > > > > > > > >>> > >> >> >> >> >>mirror maker threads. So the only option
> > > > > available
> > > > > > > was
> > > > > > > > >>>to
> > > > > > > > >>> > >> >>increase
> > > > > > > > >>> > >> >> >>the
> > > > > > > > >>> > >> >> >> >> >>throughput of the limited # of mirror
> maker
> > > > > threads
> > > > > > > > >>>that
> > > > > > > > >>> > >>could
> > > > > > > > >>> > >> >>be
> > > > > > > > >>> > >> >> >> >> >>deployed.
> > > > > > > > >>> > >> >> >> >> >>Now that queuing design may not make
> sense,
> > > if
> > > > > the
> > > > > > > new
> > > > > > > > >>> > >> >>producer's
> > > > > > > > >>> > >> >> >> >> >>throughput is almost similar to the
> consumer
> > > AND
> > > > > the
> > > > > > > > >>>fact
> > > > > > > > >>> > >>that
> > > > > > > > >>> > >> >>the
> > > > > > > > >>> > >> >> >>new
> > > > > > > > >>> > >> >> >> >> >>round-robin based consumer rebalancing
> can
> > > allow
> > > > > a
> > > > > > > very
> > > > > > > > >>> high
> > > > > > > > >>> > >> >> >>number of
> > > > > > > > >>> > >> >> >> >> >>mirror maker instances to exist.
> > > > > > > > >>> > >> >> >> >> >>
> > > > > > > > >>> > >> >> >> >> >>This is the end state that the mirror
> maker
> > > > > should be
> > > > > > > > >>>in
> > > > > > > > >>> once
> > > > > > > > >>> > >> >>the
> > > > > > > > >>> > >> >> >>new
> > > > > > > > >>> > >> >> >> >> >>consumer is complete, so it wouldn't
> hurt to
> > > see
> > > > > if
> > > > > > > we
> > > > > > > > >>>can
> > > > > > > > >>> > >>just
> > > > > > > > >>> > >> >> >>move
> > > > > > > > >>> > >> >> >> >>to
> > > > > > > > >>> > >> >> >> >> >>that right now.
> > > > > > > > >>> > >> >> >> >> >>
> > > > > > > > >>> > >> >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay
> Kreps
> > > > > > > > >>> > >><jay.kr...@gmail.com
> > > > > > > > >>> > >> >
> > > > > > > > >>> > >> >> >> >>wrote:
> > > > > > > > >>> > >> >> >> >> >>
> > > > > > > > >>> > >> >> >> >> >>> QQ: If we ever use a different
> technique
> > > for
> > > > > the
> > > > > > > data
> > > > > > > > >>> > >>channel
> > > > > > > > >>> > >> >> >> >>selection
> > > > > > > > >>> > >> >> >> >> >>> than for the producer partitioning
> won't
> > > that
> > > > > break
> > > > > > > > >>> > >>ordering?
> > > > > > > > >>> > >> >>How
> > > > > > > > >>> > >> >> >> >>can
> > > > > > > > >>> > >> >> >> >> >>>we
> > > > > > > > >>> > >> >> >> >> >>> ensure these things stay in sync?
> > > > > > > > >>> > >> >> >> >> >>>
> > > > > > > > >>> > >> >> >> >> >>> With respect to the new consumer--I
> really
> > > do
> > > > > want
> > > > > > > to
> > > > > > > > >>> > >> >>encourage
> > > > > > > > >>> > >> >> >> >>people
> > > > > > > > >>> > >> >> >> >> >>>to
> > > > > > > > >>> > >> >> >> >> >>> think through how MM will work with
> the new
> > > > > > > consumer.
> > > > > > > > >>>I
> > > > > > > > >>> > >>mean
> > > > > > > > >>> > >> >>this
> > > > > > > > >>> > >> >> >> >>isn't
> > > > > > > > >>> > >> >> >> >> >>> very far off, maybe a few months if we
> > > hustle?
> > > > > I
> > > > > > > > >>>could
> > > > > > > > >>> > >> >>imagine us
> > > > > > > > >>> > >> >> >> >> >>>getting
> > > > > > > > >>> > >> >> >> >> >>> this mm fix done maybe sooner, maybe
> in a
> > > > > month?
> > > > > > > So I
> > > > > > > > >>> guess
> > > > > > > > >>> > >> >>this
> > > > > > > > >>> > >> >> >> >>buys
> > > > > > > > >>> > >> >> >> >> >>>us an
> > > > > > > > >>> > >> >> >> >> >>> extra month before we rip it out and
> throw
> > > it
> > > > > away?
> > > > > > > > >>>Maybe
> > > > > > > > >>> > >>two?
> > > > > > > > >>> > >> >> >>This
> > > > > > > > >>> > >> >> >> >>bug
> > > > > > > > >>> > >> >> >> >> >>>has
> > > > > > > > >>> > >> >> >> >> >>> been there for a while, though, right?
> Is
> > > it
> > > > > worth
> > > > > > > > >>>it?
> > > > > > > > >>> > >> >>Probably
> > > > > > > > >>> > >> >> >>it
> > > > > > > > >>> > >> >> >> >>is,
> > > > > > > > >>> > >> >> >> >> >>>but
> > > > > > > > >>> > >> >> >> >> >>> it still kind of sucks to have the
> > > duplicate
> > > > > > > effort.
> > > > > > > > >>> > >> >> >> >> >>>
> > > > > > > > >>> > >> >> >> >> >>> So anyhow let's definitely think about
> how
> > > > > things
> > > > > > > > >>>will
> > > > > > > > >>> work
> > > > > > > > >>> > >> >>with
> > > > > > > > >>> > >> >> >>the
> > > > > > > > >>> > >> >> >> >> >>>new
> > > > > > > > >>> > >> >> >> >> >>> consumer. I think we can probably just
> > > have N
> > > > > > > > >>>threads,
> > > > > > > > >>> each
> > > > > > > > >>> > >> >> >>thread
> > > > > > > > >>> > >> >> >> >>has
> > > > > > > > >>> > >> >> >> >> >>>a
> > > > > > > > >>> > >> >> >> >> >>> producer and consumer and is internally
> > > single
> > > > > > > > >>>threaded.
> > > > > > > > >>> > >>Any
> > > > > > > > >>> > >> >> >>reason
> > > > > > > > >>> > >> >> >> >> >>>this
> > > > > > > > >>> > >> >> >> >> >>> wouldn't work?
> > > > > > > > >>> > >> >> >> >> >>>
> > > > > > > > >>> > >> >> >> >> >>> -Jay
> > > > > > > > >>> > >> >> >> >> >>>
> > > > > > > > >>> > >> >> >> >> >>>
> > > > > > > > >>> > >> >> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM,
> Jiangjie
> > > Qin
> > > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid>
> > > > > > > > >>> > >> >> >> >> >>> wrote:
> > > > > > > > >>> > >> >> >> >> >>>
> > > > > > > > >>> > >> >> >> >> >>> > Hi Jay,
> > > > > > > > >>> > >> >> >> >> >>> >
> > > > > > > > >>> > >> >> >> >> >>> > Thanks for comments. Please see
> inline
> > > > > responses.
> > > > > > > > >>> > >> >> >> >> >>> >
> > > > > > > > >>> > >> >> >> >> >>> > Jiangjie (Becket) Qin
> > > > > > > > >>> > >> >> >> >> >>> >
> > > > > > > > >>> > >> >> >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps"
> > > > > > > > >>><jay.kr...@gmail.com>
> > > > > > > > >>> > >> >>wrote:
> > > > > > > > >>> > >> >> >> >> >>> >
> > > > > > > > >>> > >> >> >> >> >>> > >Hey guys,
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >A couple questions/comments:
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >1. The callback and user-controlled
> > > commit
> > > > > > > offset
> > > > > > > > >>> > >> >> >>functionality
> > > > > > > > >>> > >> >> >> >>is
> > > > > > > > >>> > >> >> >> >> >>> already
> > > > > > > > >>> > >> >> >> >> >>> > >in the new consumer which we are
> > > working on
> > > > > in
> > > > > > > > >>> parallel.
> > > > > > > > >>> > >> >>If we
> > > > > > > > >>> > >> >> >> >> >>> accelerated
> > > > > > > > >>> > >> >> >> >> >>> > >that work it might help concentrate
> > > > > efforts. I
> > > > > > > > >>>admit
> > > > > > > > >>> > >>this
> > > > > > > > >>> > >> >> >>might
> > > > > > > > >>> > >> >> >> >>take
> > > > > > > > >>> > >> >> >> >> >>> > >slightly longer in calendar time but
> > > could
> > > > > still
> > > > > > > > >>> > >>probably
> > > > > > > > >>> > >> >>get
> > > > > > > > >>> > >> >> >> >>done
> > > > > > > > >>> > >> >> >> >> >>>this
> > > > > > > > >>> > >> >> >> >> >>> > >quarter. Have you guys considered
> that
> > > > > approach?
> > > > > > > > >>> > >> >> >> >> >>> > Yes, I totally agree that ideally we
> > > should
> > > > > put
> > > > > > > > >>>efforts
> > > > > > > > >>> > >>on
> > > > > > > > >>> > >> >>new
> > > > > > > > >>> > >> >> >> >> >>>consumer.
> > > > > > > > >>> > >> >> >> >> >>> > The main reason for still working on
> the
> > > old
> > > > > > > > >>>consumer
> > > > > > > > >>> is
> > > > > > > > >>> > >> >>that
> > > > > > > > >>> > >> >> >>we
> > > > > > > > >>> > >> >> >> >> >>>expect
> > > > > > > > >>> > >> >> >> >> >>> it
> > > > > > > > >>> > >> >> >> >> >>> > would still be used in LinkedIn for
> > > quite a
> > > > > while
> > > > > > > > >>> before
> > > > > > > > >>> > >>the
> > > > > > > > >>> > >> >> >>new
> > > > > > > > >>> > >> >> >> >> >>>consumer
> > > > > > > > >>> > >> >> >> >> >>> > could be fully rolled out. And we
> > > recently
> > > > > > > > >>>suffering a
> > > > > > > > >>> > >>lot
> > > > > > > > >>> > >> >>from
> > > > > > > > >>> > >> >> >> >> >>>mirror
> > > > > > > > >>> > >> >> >> >> >>> > maker data loss issue. So our current
> > > plan is
> > > > > > > > >>>making
> > > > > > > > >>> > >> >>necessary
> > > > > > > > >>> > >> >> >> >> >>>changes to
> > > > > > > > >>> > >> >> >> >> >>> > make current mirror maker stable in
> > > > > production.
> > > > > > > > >>>Then we
> > > > > > > > >>> > >>can
> > > > > > > > >>> > >> >> >>test
> > > > > > > > >>> > >> >> >> >>and
> > > > > > > > >>> > >> >> >> >> >>> > rollout new consumer gradually
> without
> > > > > getting
> > > > > > > > >>>burnt.
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >2. I think partitioning on the hash
> of
> > > the
> > > > > topic
> > > > > > > > >>> > >>partition
> > > > > > > > >>> > >> >>is
> > > > > > > > >>> > >> >> >> >>not a
> > > > > > > > >>> > >> >> >> >> >>>very
> > > > > > > > >>> > >> >> >> >> >>> > >good idea because that will make the
> > > case of
> > > > > > > going
> > > > > > > > >>> from
> > > > > > > > >>> > >>a
> > > > > > > > >>> > >> >> >>cluster
> > > > > > > > >>> > >> >> >> >> >>>with
> > > > > > > > >>> > >> >> >> >> >>> > >fewer partitions to one with more
> > > > > partitions not
> > > > > > > > >>> work. I
> > > > > > > > >>> > >> >> >>think an
> > > > > > > > >>> > >> >> >> >> >>> > >intuitive
> > > > > > > > >>> > >> >> >> >> >>> > >way to do this would be the
> following:
> > > > > > > > >>> > >> >> >> >> >>> > >a. Default behavior: Just do what
> the
> > > > > producer
> > > > > > > > >>>does.
> > > > > > > > >>> > >>I.e.
> > > > > > > > >>> > >> >>if
> > > > > > > > >>> > >> >> >>you
> > > > > > > > >>> > >> >> >> >> >>> specify a
> > > > > > > > >>> > >> >> >> >> >>> > >key use it for partitioning, if not
> just
> > > > > > > partition
> > > > > > > > >>>in
> > > > > > > > >>> a
> > > > > > > > >>> > >> >> >> >>round-robin
> > > > > > > > >>> > >> >> >> >> >>> > >fashion.
> > > > > > > > >>> > >> >> >> >> >>> > >b. Add a --preserve-partition option
> > > that
> > > > > will
> > > > > > > > >>> > >>explicitly
> > > > > > > > >>> > >> >> >> >>inherent
> > > > > > > > >>> > >> >> >> >> >>>the
> > > > > > > > >>> > >> >> >> >> >>> > >partition from the source
> irrespective
> > > of
> > > > > > > whether
> > > > > > > > >>> there
> > > > > > > > >>> > >>is
> > > > > > > > >>> > >> >>a
> > > > > > > > >>> > >> >> >>key
> > > > > > > > >>> > >> >> >> >>or
> > > > > > > > >>> > >> >> >> >> >>> which
> > > > > > > > >>> > >> >> >> >> >>> > >partition that key would hash to.
> > > > > > > > >>> > >> >> >> >> >>> > Sorry that I did not explain this
> clear
> > > > > enough.
> > > > > > > The
> > > > > > > > >>> hash
> > > > > > > > >>> > >>of
> > > > > > > > >>> > >> >> >>topic
> > > > > > > > >>> > >> >> >> >> >>> > partition is only used when decide
> which
> > > > > mirror
> > > > > > > > >>>maker
> > > > > > > > >>> > >>data
> > > > > > > > >>> > >> >> >>channel
> > > > > > > > >>> > >> >> >> >> >>>queue
> > > > > > > > >>> > >> >> >> >> >>> > the consumer thread should put
> message
> > > into.
> > > > > It
> > > > > > > > >>>only
> > > > > > > > >>> > >>tries
> > > > > > > > >>> > >> >>to
> > > > > > > > >>> > >> >> >>make
> > > > > > > > >>> > >> >> >> >> >>>sure
> > > > > > > > >>> > >> >> >> >> >>> > the messages from the same partition
> is
> > > sent
> > > > > by
> > > > > > > the
> > > > > > > > >>> same
> > > > > > > > >>> > >> >> >>producer
> > > > > > > > >>> > >> >> >> >> >>>thread
> > > > > > > > >>> > >> >> >> >> >>> > to guarantee the sending order. This
> is
> > > not
> > > > > at
> > > > > > > all
> > > > > > > > >>> > >>related
> > > > > > > > >>> > >> >>to
> > > > > > > > >>> > >> >> >> >>which
> > > > > > > > >>> > >> >> >> >> >>> > partition in target cluster the
> messages
> > > end
> > > > > up.
> > > > > > > > >>>That
> > > > > > > > >>> is
> > > > > > > > >>> > >> >>still
> > > > > > > > >>> > >> >> >> >> >>>decided by
> > > > > > > > >>> > >> >> >> >> >>> > producer.
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >3. You don't actually give the
> > > > > > > > >>> ConsumerRebalanceListener
> > > > > > > > >>> > >> >> >> >>interface.
> > > > > > > > >>> > >> >> >> >> >>>What
> > > > > > > > >>> > >> >> >> >> >>> > >is
> > > > > > > > >>> > >> >> >> >> >>> > >that going to look like?
> > > > > > > > >>> > >> >> >> >> >>> > Good point! I should have put it in
> the
> > > > > wiki. I
> > > > > > > > >>>just
> > > > > > > > >>> > >>added
> > > > > > > > >>> > >> >>it.
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >4. What is MirrorMakerRecord? I
> think
> > > > > ideally
> > > > > > > the
> > > > > > > > >>> > >> >> >> >> >>> > >MirrorMakerMessageHandler
> > > > > > > > >>> > >> >> >> >> >>> > >interface would take a
> ConsumerRecord as
> > > > > input
> > > > > > > and
> > > > > > > > >>> > >>return a
> > > > > > > > >>> > >> >> >> >> >>> > >ProducerRecord,
> > > > > > > > >>> > >> >> >> >> >>> > >right? That would allow you to
> > > transform the
> > > > > > > key,
> > > > > > > > >>> value,
> > > > > > > > >>> > >> >> >> >>partition,
> > > > > > > > >>> > >> >> >> >> >>>or
> > > > > > > > >>> > >> >> >> >> >>> > >destination topic...
> > > > > > > > >>> > >> >> >> >> >>> > MirrorMakerRecord is introduced in
> > > > > KAFKA-1650,
> > > > > > > > >>>which is
> > > > > > > > >>> > >> >>exactly
> > > > > > > > >>> > >> >> >> >>the
> > > > > > > > >>> > >> >> >> >> >>>same
> > > > > > > > >>> > >> >> >> >> >>> > as ConsumerRecord in KAFKA-1760.
> > > > > > > > >>> > >> >> >> >> >>> > private[kafka] class
> MirrorMakerRecord
> > > (val
> > > > > > > > >>> sourceTopic:
> > > > > > > > >>> > >> >> >>String,
> > > > > > > > >>> > >> >> >> >> >>> >   val sourcePartition: Int,
> > > > > > > > >>> > >> >> >> >> >>> >   val sourceOffset: Long,
> > > > > > > > >>> > >> >> >> >> >>> >   val key: Array[Byte],
> > > > > > > > >>> > >> >> >> >> >>> >   val value: Array[Byte]) {
> > > > > > > > >>> > >> >> >> >> >>> >   def size = value.length + {if (key
> ==
> > > > > null) 0
> > > > > > > > >>>else
> > > > > > > > >>> > >> >> >>key.length}
> > > > > > > > >>> > >> >> >> >> >>> > }
> > > > > > > > >>> > >> >> >> >> >>> >
> > > > > > > > >>> > >> >> >> >> >>> > However, because source partition and
> > > offset
> > > > > is
> > > > > > > > >>>needed
> > > > > > > > >>> in
> > > > > > > > >>> > >> >> >>producer
> > > > > > > > >>> > >> >> >> >> >>>thread
> > > > > > > > >>> > >> >> >> >> >>> > for consumer offsets bookkeeping, the
> > > record
> > > > > > > > >>>returned
> > > > > > > > >>> by
> > > > > > > > >>> > >> >> >> >> >>> > MirrorMakerMessageHandler needs to
> > > contain
> > > > > those
> > > > > > > > >>> > >> >>information.
> > > > > > > > >>> > >> >> >> >> >>>Therefore
> > > > > > > > >>> > >> >> >> >> >>> > ProducerRecord does not work here. We
> > > could
> > > > > > > > >>>probably
> > > > > > > > >>> let
> > > > > > > > >>> > >> >> >>message
> > > > > > > > >>> > >> >> >> >> >>>handler
> > > > > > > > >>> > >> >> >> >> >>> > take ConsumerRecord for both input
> and
> > > > > output.
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >5. Have you guys thought about what
> the
> > > > > > > > >>>implementation
> > > > > > > > >>> > >>will
> > > > > > > > >>> > >> >> >>look
> > > > > > > > >>> > >> >> >> >> >>>like in
> > > > > > > > >>> > >> >> >> >> >>> > >terms of threading architecture etc
> with
> > > > > the new
> > > > > > > > >>> > >>consumer?
> > > > > > > > >>> > >> >> >>That
> > > > > > > > >>> > >> >> >> >>will
> > > > > > > > >>> > >> >> >> >> >>>be
> > > > > > > > >>> > >> >> >> >> >>> > >soon so even if we aren't starting
> with
> > > that
> > > > > > > let's
> > > > > > > > >>> make
> > > > > > > > >>> > >> >>sure
> > > > > > > > >>> > >> >> >>we
> > > > > > > > >>> > >> >> >> >>can
> > > > > > > > >>> > >> >> >> >> >>>get
> > > > > > > > >>> > >> >> >> >> >>> > >rid
> > > > > > > > >>> > >> >> >> >> >>> > >of a lot of the current mirror maker
> > > > > accidental
> > > > > > > > >>> > >>complexity
> > > > > > > > >>> > >> >>in
> > > > > > > > >>> > >> >> >> >>terms
> > > > > > > > >>> > >> >> >> >> >>>of
> > > > > > > > >>> > >> >> >> >> >>> > >threads and queues when we move to
> that.
> > > > > > > > >>> > >> >> >> >> >>> > I haven¹t thought about it
> throughly. The
> > > > > quick
> > > > > > > > >>>idea is
> > > > > > > > >>> > >> >>after
> > > > > > > > >>> > >> >> >> >> >>>migration
> > > > > > > > >>> > >> >> >> >> >>> to
> > > > > > > > >>> > >> >> >> >> >>> > the new consumer, it is probably
> better
> > > to
> > > > > use a
> > > > > > > > >>>single
> > > > > > > > >>> > >> >> >>consumer
> > > > > > > > >>> > >> >> >> >> >>>thread.
> > > > > > > > >>> > >> >> >> >> >>> > If multithread is needed, decoupling
> > > > > consumption
> > > > > > > > >>>and
> > > > > > > > >>> > >> >>processing
> > > > > > > > >>> > >> >> >> >>might
> > > > > > > > >>> > >> >> >> >> >>>be
> > > > > > > > >>> > >> >> >> >> >>> > used. MirrorMaker definitely needs
> to be
> > > > > changed
> > > > > > > > >>>after
> > > > > > > > >>> > >>new
> > > > > > > > >>> > >> >> >> >>consumer
> > > > > > > > >>> > >> >> >> >> >>>get
> > > > > > > > >>> > >> >> >> >> >>> > checked in. I¹ll document the changes
> > > and can
> > > > > > > > >>>submit
> > > > > > > > >>> > >>follow
> > > > > > > > >>> > >> >>up
> > > > > > > > >>> > >> >> >> >> >>>patches
> > > > > > > > >>> > >> >> >> >> >>> > after the new consumer is available.
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >-Jay
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM,
> > > Jiangjie
> > > > > Qin
> > > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid
> > > > > > > > >>> > >> >> >> >> >>> >
> > > > > > > > >>> > >> >> >> >> >>> > >wrote:
> > > > > > > > >>> > >> >> >> >> >>> > >
> > > > > > > > >>> > >> >> >> >> >>> > >> Hi Kafka Devs,
> > > > > > > > >>> > >> >> >> >> >>> > >>
> > > > > > > > >>> > >> >> >> >> >>> > >> We are working on Kafka Mirror
> Maker
> > > > > > > > >>>enhancement. A
> > > > > > > > >>> > >>KIP
> > > > > > > > >>> > >> >>is
> > > > > > > > >>> > >> >> >> >>posted
> > > > > > > > >>> > >> >> >> >> >>>to
> > > > > > > > >>> > >> >> >> >> >>> > >> document and discuss on the
> > > followings:
> > > > > > > > >>> > >> >> >> >> >>> > >> 1. KAFKA-1650: No Data loss mirror
> > > maker
> > > > > > > change
> > > > > > > > >>> > >> >> >> >> >>> > >> 2. KAFKA-1839: To allow partition
> > > aware
> > > > > > > mirror.
> > > > > > > > >>> > >> >> >> >> >>> > >> 3. KAFKA-1840: To allow message
> > > > > > > filtering/format
> > > > > > > > >>> > >> >>conversion
> > > > > > > > >>> > >> >> >> >> >>> > >> Feedbacks are welcome. Please let
> us
> > > know
> > > > > if
> > > > > > > you
> > > > > > > > >>> have
> > > > > > > > >>> > >>any
> > > > > > > > >>> > >> >> >> >> >>>questions or
> > > > > > > > >>> > >> >> >> >> >>> > >> concerns.
> > > > > > > > >>> > >> >> >> >> >>> > >>
> > > > > > > > >>> > >> >> >> >> >>> > >> Thanks.
> > > > > > > > >>> > >> >> >> >> >>> > >>
> > > > > > > > >>> > >> >> >> >> >>> > >> Jiangjie (Becket) Qin
> > > > > > > > >>> > >> >> >> >> >>> > >>
> > > > > > > > >>> > >> >> >> >> >>> >
> > > > > > > > >>> > >> >> >> >> >>> >
> > > > > > > > >>> > >> >> >> >> >>>
> > > > > > > > >>> > >> >> >> >> >>
> > > > > > > > >>> > >> >> >> >> >>
> > > > > > > > >>> > >> >> >> >> >>
> > > > > > > > >>> > >> >> >> >> >>--
> > > > > > > > >>> > >> >> >> >> >>Thanks,
> > > > > > > > >>> > >> >> >> >> >>Neha
> > > > > > > > >>> > >> >> >> >> >
> > > > > > > > >>> > >> >> >> >>
> > > > > > > > >>> > >> >> >> >>
> > > > > > > > >>> > >> >> >>
> > > > > > > > >>> > >> >> >>
> > > > > > > > >>> > >> >>
> > > > > > > > >>> > >> >>
> > > > > > > > >>> > >>
> > > > > > > > >>> > >>
> > > > > > > > >>> > >
> > > > > > > > >>> > >
> > > > > > > > >>> > >--
> > > > > > > > >>> > >Thanks,
> > > > > > > > >>> > >Neha
> > > > > > > > >>> >
> > > > > > > > >>> >
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>> --
> > > > > > > > >>> Thanks,
> > > > > > > > >>> Neha
> > > > > > > > >>>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Neha
> > > > >
> > > > >
> > >
> > >
>
>

Reply via email to