Hey Jiangjie, Let's do an official vote so that we know what we are voting on and we are crisp on what the outcome was. This thread is very long :-)
-Jay On Tue, Feb 24, 2015 at 2:53 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > I updated the KIP page based on the discussion we had. > > Should I launch another vote or we can think of this mail thread has > already included a vote? > > Jiangjie (Becket) Qin > > On 2/11/15, 5:15 PM, "Neha Narkhede" <n...@confluent.io> wrote: > > >Thanks for the explanation, Joel! Would love to see the results of the > >throughput experiment and I'm a +1 on everything else, ncluding the > >rebalance callback and record handler. > > > >-Neha > > > >On Wed, Feb 11, 2015 at 1:13 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > >> Cool, I agree with all that. > >> > >> I agree about the need for a rebalancing callback. > >> > >> Totally agree about record handler. > >> > >> It would be great to see if a prototype of this is workable. > >> > >> Thanks guys! > >> > >> -Jay > >> > >> On Wed, Feb 11, 2015 at 12:36 PM, Joel Koshy <jjkosh...@gmail.com> > >>wrote: > >> > >> > Hey Jay, > >> > > >> > Guozhang, Becket and I got together to discuss this and we think: > >> > > >> > - It seems that your proposal based on the new consumer and flush call > >> > should work. > >> > - We would likely need to call the poll with a timeout that matches > >> > the offset commit interval in order to deal with low volume > >> > mirroring pipelines. > >> > - We will still need a rebalance callback to reduce duplicates - the > >> > rebalance callback would need to flush and commit offsets. > >> > - The only remaining question is if the overall throughput is > >> > sufficient. I think someone at LinkedIn (I don't remember who) did > >> > some experiments with data channel size == 1 and ran into issues. > >> > That was not thoroughly investigated though. > >> > - The addition of flush may actually make this solution viable for the > >> > current mirror-maker (with the old consumer). We can prototype that > >> > offline and if it works out well we can redo KAFKA-1650 (i.e., > >> > refactor the current mirror maker). The flush call and the new > >> > consumer didn't exist at the time we did KAFKA-1650 so this did not > >> > occur to us. > >> > - We think the RecordHandler is still a useful small addition for the > >> > use-cases mentioned earlier in this thread. > >> > > >> > Thanks, > >> > > >> > Joel > >> > > >> > On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote: > >> > > Guozhang, I agree with 1-3, I do think what I was proposing was > >>simpler > >> > but > >> > > perhaps there are gaps in that? > >> > > > >> > > Hey Joel--Here was a sketch of what I was proposing. I do think this > >> > get's > >> > > rid of manual offset tracking, especially doing so across threads > >>with > >> > > dedicated commit threads, which I think is pretty complex. > >> > > > >> > > while(true) { > >> > > val recs = consumer.poll(Long.MaxValue); > >> > > for (rec <- recs) > >> > > producer.send(rec, logErrorCallback) > >> > > if(System.currentTimeMillis - lastCommit > commitInterval) { > >> > > producer.flush() > >> > > consumer.commit() > >> > > lastCommit = System.currentTimeMillis > >> > > } > >> > > } > >> > > > >> > > (See the previous email for details). I think the question is: is > >>there > >> > any > >> > > reason--performance, correctness, etc--that this won't work? > >>Basically > >> I > >> > > think you guys have thought about this more so I may be missing > >> > something. > >> > > If so let's flag it while we still have leeway on the consumer. > >> > > > >> > > If we think that will work, well I do think it is conceptually a lot > >> > > simpler than the current code, though I suppose one could disagree > >>on > >> > that. > >> > > > >> > > -Jay > >> > > > >> > > On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy <jjkosh...@gmail.com> > >> wrote: > >> > > > >> > > > Hi Jay, > >> > > > > >> > > > > The data channels are actually a big part of the complexity of > >>the > >> > zero > >> > > > > data loss design, though, right? Because then you need some > >>reverse > >> > > > channel > >> > > > > to flow the acks back to the consumer based on where you are > >>versus > >> > just > >> > > > > acking what you have read and written (as in the code snippet I > >>put > >> > up). > >> > > > > >> > > > I'm not sure if we are on the same page. Even if the data channel > >>was > >> > > > not there the current handling for zero data loss would remain > >>very > >> > > > similar - you would need to maintain lists of unacked source > >>offsets. > >> > > > I'm wondering if the KIP needs more detail on how it is currently > >> > > > implemented; or are suggesting a different approach (in which > >>case I > >> > > > have not fully understood). I'm not sure what you mean by flowing > >> acks > >> > > > back to the consumer - the MM commits offsets after the producer > >>ack > >> > > > has been received. There is some additional complexity introduced > >>in > >> > > > reducing duplicates on a rebalance - this is actually optional > >>(since > >> > > > duplicates are currently a given). The reason that was done > >>anyway is > >> > > > that with the auto-commit turned off duplicates are almost > >>guaranteed > >> > > > on a rebalance. > >> > > > > >> > > > > I think the point that Neha and I were trying to make was that > >>the > >> > > > > motivation to embed stuff into MM kind of is related to how > >> complex a > >> > > > > simple "consume and produce" with good throughput will be. If > >>it is > >> > > > simple > >> > > > > to write such a thing in a few lines, the pain of embedding a > >>bunch > >> > of > >> > > > > stuff won't be worth it, if it has to be as complex as the > >>current > >> mm > >> > > > then > >> > > > > of course we will need all kinds of plug ins because no one > >>will be > >> > able > >> > > > to > >> > > > > write such a thing. I don't have a huge concern with a simple > >> plug-in > >> > > > but I > >> > > > > think if it turns into something more complex with filtering and > >> > > > > aggregation or whatever we really need to stop and think a bit > >> about > >> > the > >> > > > > design. > >> > > > > >> > > > I agree - I don't think there is a use-case for any complex > >>plug-in. > >> > > > It is pretty much what Becket has described currently for the > >>message > >> > > > handler - i.e., take an incoming record and return a list of > >>outgoing > >> > > > records (which could be empty if you filter). > >> > > > > >> > > > So here is my take on the MM: > >> > > > - Bare bones: simple consumer - producer pairs (0.7 style). This > >>is > >> > > > ideal, but does not handle no data loss > >> > > > - Above plus support no data loss. This actually adds quite a bit > >>of > >> > > > complexity. > >> > > > - Above plus the message handler. This is a trivial addition I > >>think > >> > > > that makes the MM usable in a few other mirroring-like > >> applications. > >> > > > > >> > > > Joel > >> > > > > >> > > > > On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy > >><jjkosh...@gmail.com> > >> > > > wrote: > >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede wrote: > >> > > > > > > I think all of us agree that we want to design MirrorMaker > >>for > >> 0 > >> > data > >> > > > > > loss. > >> > > > > > > With the absence of the data channel, 0 data loss will be > >>much > >> > > > simpler to > >> > > > > > > implement. > >> > > > > > > >> > > > > > The data channel is irrelevant to the implementation of zero > >>data > >> > > > > > loss. The complexity in the implementation of no data loss > >>that > >> you > >> > > > > > are seeing in mirror-maker affects all consume-then-produce > >> > patterns > >> > > > > > whether or not there is a data channel. You still need to > >> > maintain a > >> > > > > > list of unacked offsets. What I meant earlier is that we can > >> > > > > > brainstorm completely different approaches to supporting no > >>data > >> > loss, > >> > > > > > but the current implementation is the only solution we are > >>aware > >> > of. > >> > > > > > > >> > > > > > > > >> > > > > > > My arguments for adding a message handler are that: > >> > > > > > > > 1. It is more efficient to do something in common for all > >>the > >> > > > clients > >> > > > > > in > >> > > > > > > > pipeline than letting each client do the same thing for > >>many > >> > > > times. And > >> > > > > > > > there are concrete use cases for the message handler > >>already. > >> > > > > > > > > >> > > > > > > > >> > > > > > > What are the concrete use cases? > >> > > > > > > >> > > > > > I think Becket already described a couple of use cases > >>earlier in > >> > the > >> > > > > > thread. > >> > > > > > > >> > > > > > <quote> > >> > > > > > > >> > > > > > 1. Format conversion. We have a use case where clients of > >>source > >> > > > > > cluster > >> > > > > > use an internal schema and clients of target cluster use a > >> > different > >> > > > > > public schema. > >> > > > > > 2. Message filtering: For the messages published to source > >> cluster, > >> > > > > > there > >> > > > > > are some messages private to source cluster clients and should > >> not > >> > > > > > exposed > >> > > > > > to target cluster clients. It would be difficult to publish > >>those > >> > > > > > messages > >> > > > > > into different partitions because they need to be ordered. > >> > > > > > I agree that we can always filter/convert messages after they > >>are > >> > > > > > copied > >> > > > > > to the target cluster, but that costs network bandwidth > >> > unnecessarily, > >> > > > > > especially if that is a cross colo mirror. With the handler, > >>we > >> can > >> > > > > > co-locate the mirror maker with source cluster and save that > >> cost. > >> > > > > > Also, > >> > > > > > imagine there are many downstream consumers consuming from the > >> > target > >> > > > > > cluster, filtering/reformatting the messages before the > >>messages > >> > reach > >> > > > > > the > >> > > > > > target cluster is much more efficient than having each of the > >> > > > > > consumers do > >> > > > > > this individually on their own. > >> > > > > > > >> > > > > > </quote> > >> > > > > > > >> > > > > > > > >> > > > > > > Also the KIP still refers to the data channel in a few > >>places > >> > > > (Motivation > >> > > > > > > and "On consumer rebalance" sections). Can you update the > >>wiki > >> > so it > >> > > > is > >> > > > > > > easier to review the new design, especially the data loss > >>part. > >> > > > > > > > >> > > > > > > > >> > > > > > > On Tue, Feb 10, 2015 at 10:36 AM, Joel Koshy < > >> > jjkosh...@gmail.com> > >> > > > > > wrote: > >> > > > > > > > >> > > > > > > > I think the message handler adds little to no complexity > >>to > >> the > >> > > > mirror > >> > > > > > > > maker. Jay/Neha, the MM became scary due to the > >> rearchitecture > >> > we > >> > > > did > >> > > > > > > > for 0.8 due to performance issues compared with 0.7 - we > >> should > >> > > > remove > >> > > > > > > > the data channel if it can match the current throughput. I > >> > agree > >> > > > it is > >> > > > > > > > worth prototyping and testing that so the MM architecture > >>is > >> > > > > > > > simplified. > >> > > > > > > > > >> > > > > > > > The MM became a little scarier in KAFKA-1650 in order to > >> > support no > >> > > > > > > > data loss. I think the implementation for no data loss > >>will > >> > remain > >> > > > > > > > about the same even in the new model (even without the > >>data > >> > > > channel) - > >> > > > > > > > we can probably brainstorm more if there is a > >>better/simpler > >> > way > >> > > > to do > >> > > > > > > > it (maybe there is in the absence of the data channel) > >>but at > >> > the > >> > > > time > >> > > > > > > > it was the best we (i.e., Becket, myself, Jun and Guozhang > >> who > >> > > > > > > > participated on the review) could come up with. > >> > > > > > > > > >> > > > > > > > So I'm definitely +1 on whatever it takes to support no > >>data > >> > loss. > >> > > > I > >> > > > > > > > think most people would want that out of the box. > >> > > > > > > > > >> > > > > > > > As for the message handler, as Becket wrote and I agree > >>with, > >> > it is > >> > > > > > > > really a trivial addition that would benefit (perhaps not > >> most, > >> > > > but at > >> > > > > > > > least some). So I'm personally +1 on that as well. That > >>said, > >> > I'm > >> > > > also > >> > > > > > > > okay with it not being there. I think the MM is fairly > >> > stand-alone > >> > > > and > >> > > > > > > > simple enough that it is entirely reasonable and > >>absolutely > >> > > > feasible > >> > > > > > > > for companies to fork/re-implement the mirror maker for > >>their > >> > own > >> > > > > > > > needs. > >> > > > > > > > > >> > > > > > > > So in summary, I'm +1 on the KIP. > >> > > > > > > > > >> > > > > > > > Thanks, > >> > > > > > > > > >> > > > > > > > Joel > >> > > > > > > > > >> > > > > > > > On Mon, Feb 09, 2015 at 09:19:57PM +0000, Jiangjie Qin > >>wrote: > >> > > > > > > > > I just updated the KIP page and incorporated Jay and > >>Neha’s > >> > > > > > suggestion. > >> > > > > > > > As > >> > > > > > > > > a brief summary of where we are: > >> > > > > > > > > > >> > > > > > > > > Consensus reached: > >> > > > > > > > > Have N independent mirror maker threads each has their > >>own > >> > > > consumers > >> > > > > > but > >> > > > > > > > > share a producer. The mirror maker threads will be > >> > responsible > >> > > > for > >> > > > > > > > > decompression, compression and offset commit. No data > >> > channel and > >> > > > > > > > separate > >> > > > > > > > > offset commit thread is needed. Consumer rebalance > >>callback > >> > will > >> > > > be > >> > > > > > used > >> > > > > > > > > to avoid duplicates on rebalance. > >> > > > > > > > > > >> > > > > > > > > Still under discussion: > >> > > > > > > > > Whether message handler is needed. > >> > > > > > > > > > >> > > > > > > > > My arguments for adding a message handler are that: > >> > > > > > > > > 1. It is more efficient to do something in common for > >>all > >> the > >> > > > > > clients in > >> > > > > > > > > pipeline than letting each client do the same thing for > >> many > >> > > > times. > >> > > > > > And > >> > > > > > > > > there are concrete use cases for the message handler > >> already. > >> > > > > > > > > 2. It is not a big complicated add-on to mirror maker. > >> > > > > > > > > 3. Without a message handler, for customers needs it, > >>they > >> > have > >> > > > to > >> > > > > > > > > re-implement all the logics of mirror maker by > >>themselves > >> > just in > >> > > > > > order > >> > > > > > > > to > >> > > > > > > > > add this handling in pipeline. > >> > > > > > > > > > >> > > > > > > > > Any thoughts? > >> > > > > > > > > > >> > > > > > > > > Thanks. > >> > > > > > > > > > >> > > > > > > > > ―Jiangjie (Becket) Qin > >> > > > > > > > > > >> > > > > > > > > On 2/8/15, 6:35 PM, "Jiangjie Qin" <j...@linkedin.com> > >> > wrote: > >> > > > > > > > > > >> > > > > > > > > >Hi Jay, thanks a lot for the comments. > >> > > > > > > > > >I think this solution is better. We probably don’t need > >> data > >> > > > channel > >> > > > > > > > > >anymore. It can be replaced with a list of producer if > >>we > >> > need > >> > > > more > >> > > > > > > > sender > >> > > > > > > > > >thread. > >> > > > > > > > > >I’ll update the KIP page. > >> > > > > > > > > > > >> > > > > > > > > >The reasoning about message handler is mainly for > >> efficiency > >> > > > > > purpose. > >> > > > > > > > I’m > >> > > > > > > > > >thinking that if something can be done in pipeline for > >>all > >> > the > >> > > > > > clients > >> > > > > > > > > >such as filtering/reformatting, it is probably better > >>to > >> do > >> > it > >> > > > in > >> > > > > > the > >> > > > > > > > > >pipeline than asking 100 clients do the same thing for > >>100 > >> > > > times. > >> > > > > > > > > > > >> > > > > > > > > >―Jiangjie (Becket) Qin > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >On 2/8/15, 4:59 PM, "Jay Kreps" <jay.kr...@gmail.com> > >> > wrote: > >> > > > > > > > > > > >> > > > > > > > > >>Yeah, I second Neha's comments. The current mm code > >>has > >> > taken > >> > > > > > something > >> > > > > > > > > >>pretty simple and made it pretty scary with callbacks > >>and > >> > > > > > wait/notify > >> > > > > > > > > >>stuff. Do we believe this works? I can't tell by > >>looking > >> > at it > >> > > > > > which is > >> > > > > > > > > >>kind of bad for something important like this. I don't > >> mean > >> > > > this as > >> > > > > > > > > >>criticism, I know the history: we added in memory > >>queues > >> to > >> > > > help > >> > > > > > with > >> > > > > > > > > >>other > >> > > > > > > > > >>performance problems without thinking about > >>correctness, > >> > then > >> > > > we > >> > > > > > added > >> > > > > > > > > >>stuff to work around the in-memory queues not lose > >>data, > >> > and > >> > > > so on. > >> > > > > > > > > >> > >> > > > > > > > > >>Can we instead do the opposite exercise and start with > >> the > >> > > > basics > >> > > > > > of > >> > > > > > > > what > >> > > > > > > > > >>mm should do and think about what deficiencies > >>prevents > >> > this > >> > > > > > approach > >> > > > > > > > > >>from > >> > > > > > > > > >>working? Then let's make sure the currently in-flight > >> work > >> > will > >> > > > > > remove > >> > > > > > > > > >>these deficiencies. After all mm is kind of the > >> > prototypical > >> > > > kafka > >> > > > > > use > >> > > > > > > > > >>case > >> > > > > > > > > >>so if we can't make our clients to this probably no > >>one > >> > else > >> > > > can. > >> > > > > > > > > >> > >> > > > > > > > > >>I think mm should just be N independent threads each > >>of > >> > which > >> > > > has > >> > > > > > their > >> > > > > > > > > >>own > >> > > > > > > > > >>consumer but share a producer and each of which looks > >> like > >> > > > this: > >> > > > > > > > > >> > >> > > > > > > > > >>while(true) { > >> > > > > > > > > >> val recs = consumer.poll(Long.MaxValue); > >> > > > > > > > > >> for (rec <- recs) > >> > > > > > > > > >> producer.send(rec, logErrorCallback) > >> > > > > > > > > >> if(System.currentTimeMillis - lastCommit > > >> > commitInterval) > >> > > > { > >> > > > > > > > > >> producer.flush() > >> > > > > > > > > >> consumer.commit() > >> > > > > > > > > >> lastCommit = System.currentTimeMillis > >> > > > > > > > > >> } > >> > > > > > > > > >>} > >> > > > > > > > > >> > >> > > > > > > > > >>This will depend on setting the retry count in the > >> > producer to > >> > > > > > > > something > >> > > > > > > > > >>high with a largish backoff so that a failed send > >>attempt > >> > > > doesn't > >> > > > > > drop > >> > > > > > > > > >>data. > >> > > > > > > > > >> > >> > > > > > > > > >>We will need to use the callback to force a flush and > >> > offset > >> > > > > > commit on > >> > > > > > > > > >>rebalance. > >> > > > > > > > > >> > >> > > > > > > > > >>This approach may have a few more TCP connections due > >>to > >> > using > >> > > > > > multiple > >> > > > > > > > > >>consumers but I think it is a lot easier to reason > >>about > >> > and > >> > > > the > >> > > > > > total > >> > > > > > > > > >>number of mm instances is always going to be small. > >> > > > > > > > > >> > >> > > > > > > > > >>Let's talk about where this simple approach falls > >>short, > >> I > >> > > > think > >> > > > > > that > >> > > > > > > > > >>will > >> > > > > > > > > >>help us understand your motivations for additional > >> > elements. > >> > > > > > > > > >> > >> > > > > > > > > >>Another advantage of this is that it is so simple I > >>don't > >> > > > think we > >> > > > > > > > really > >> > > > > > > > > >>even need to both making mm extensible because writing > >> > your own > >> > > > > > code > >> > > > > > > > that > >> > > > > > > > > >>does custom processing or transformation is just ten > >> lines > >> > and > >> > > > no > >> > > > > > plug > >> > > > > > > > in > >> > > > > > > > > >>system is going to make it simpler. > >> > > > > > > > > >> > >> > > > > > > > > >>-Jay > >> > > > > > > > > >> > >> > > > > > > > > >> > >> > > > > > > > > >>On Sun, Feb 8, 2015 at 2:40 PM, Neha Narkhede < > >> > > > n...@confluent.io> > >> > > > > > > > wrote: > >> > > > > > > > > >> > >> > > > > > > > > >>> Few comments - > >> > > > > > > > > >>> > >> > > > > > > > > >>> 1. Why do we need the message handler? Do you have > >> > concrete > >> > > > use > >> > > > > > cases > >> > > > > > > > > >>>in > >> > > > > > > > > >>> mind? If not, we should consider adding it in the > >> future > >> > > > when/if > >> > > > > > we > >> > > > > > > > do > >> > > > > > > > > >>>have > >> > > > > > > > > >>> use cases for it. The purpose of the mirror maker > >>is a > >> > simple > >> > > > > > tool > >> > > > > > > > for > >> > > > > > > > > >>> setting up Kafka cluster replicas. I don't see why > >>we > >> > need to > >> > > > > > > > include a > >> > > > > > > > > >>> message handler for doing stream transformations or > >> > > > filtering. > >> > > > > > You > >> > > > > > > > can > >> > > > > > > > > >>> always write a simple process for doing that once > >>the > >> > data is > >> > > > > > copied > >> > > > > > > > as > >> > > > > > > > > >>>is > >> > > > > > > > > >>> in the target cluster > >> > > > > > > > > >>> 2. Why keep both designs? We should prefer the > >>simpler > >> > design > >> > > > > > unless > >> > > > > > > > it > >> > > > > > > > > >>>is > >> > > > > > > > > >>> not feasible due to the performance issue that we > >> > previously > >> > > > > > had. Did > >> > > > > > > > > >>>you > >> > > > > > > > > >>> get a chance to run some tests to see if that is > >>really > >> > > > still a > >> > > > > > > > problem > >> > > > > > > > > >>>or > >> > > > > > > > > >>> not? It will be easier to think about the design and > >> also > >> > > > make > >> > > > > > the > >> > > > > > > > KIP > >> > > > > > > > > >>> complete if we make a call on the design first. > >> > > > > > > > > >>> 3. Can you explain the need for keeping a list of > >> unacked > >> > > > > > offsets per > >> > > > > > > > > >>> partition? Consider adding a section on retries and > >>how > >> > you > >> > > > plan > >> > > > > > to > >> > > > > > > > > >>>handle > >> > > > > > > > > >>> the case when the producer runs out of all retries. > >> > > > > > > > > >>> > >> > > > > > > > > >>> Thanks, > >> > > > > > > > > >>> Neha > >> > > > > > > > > >>> > >> > > > > > > > > >>> On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin > >> > > > > > > > > >>><j...@linkedin.com.invalid> > >> > > > > > > > > >>> wrote: > >> > > > > > > > > >>> > >> > > > > > > > > >>> > Hi Neha, > >> > > > > > > > > >>> > > >> > > > > > > > > >>> > Yes, I’ve updated the KIP so the entire KIP is > >>based > >> > on new > >> > > > > > > > consumer > >> > > > > > > > > >>>now. > >> > > > > > > > > >>> > I’ve put both designs with and without data > >>channel > >> in > >> > the > >> > > > KIP > >> > > > > > as I > >> > > > > > > > > >>>still > >> > > > > > > > > >>> > feel we might need the data channel to provide > >>more > >> > > > > > flexibility, > >> > > > > > > > > >>> > especially after message handler is introduced. > >>I’ve > >> > put my > >> > > > > > > > thinking > >> > > > > > > > > >>>of > >> > > > > > > > > >>> > the pros and cons of the two designs in the KIP as > >> > well. > >> > > > It’ll > >> > > > > > be > >> > > > > > > > > >>>great > >> > > > > > > > > >>> if > >> > > > > > > > > >>> > you can give a review and comment. > >> > > > > > > > > >>> > > >> > > > > > > > > >>> > Thanks. > >> > > > > > > > > >>> > > >> > > > > > > > > >>> > Jiangjie (Becket) Qin > >> > > > > > > > > >>> > > >> > > > > > > > > >>> > On 2/6/15, 7:30 PM, "Neha Narkhede" < > >> n...@confluent.io > >> > > > >> > > > wrote: > >> > > > > > > > > >>> > > >> > > > > > > > > >>> > >Hey Becket, > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > >What are the next steps on this KIP. As per your > >> > comment > >> > > > > > earlier > >> > > > > > > > on > >> > > > > > > > > >>>the > >> > > > > > > > > >>> > >thread - > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > >I do agree it makes more sense > >> > > > > > > > > >>> > >> to avoid duplicate effort and plan based on new > >> > > > consumer. > >> > > > > > I’ll > >> > > > > > > > > >>>modify > >> > > > > > > > > >>> > >>the > >> > > > > > > > > >>> > >> KIP. > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > >Did you get a chance to think about the > >>simplified > >> > design > >> > > > > > that we > >> > > > > > > > > >>> proposed > >> > > > > > > > > >>> > >earlier? Do you plan to update the KIP with that > >> > proposal? > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > >Thanks, > >> > > > > > > > > >>> > >Neha > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin > >> > > > > > > > > >>><j...@linkedin.com.invalid > >> > > > > > > > > >>> > > >> > > > > > > > > >>> > >wrote: > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > >> In mirror maker we do not do de-serialization > >>on > >> the > >> > > > > > messages. > >> > > > > > > > > >>>Mirror > >> > > > > > > > > >>> > >> maker use source TopicPartition hash to chose a > >> > > > producer to > >> > > > > > send > >> > > > > > > > > >>> > >>messages > >> > > > > > > > > >>> > >> from the same source partition. The partition > >> those > >> > > > > > messages end > >> > > > > > > > > >>>up > >> > > > > > > > > >>> with > >> > > > > > > > > >>> > >> are decided by Partitioner class in > >>KafkaProducer > >> > > > (assuming > >> > > > > > you > >> > > > > > > > > >>>are > >> > > > > > > > > >>> > >>using > >> > > > > > > > > >>> > >> the new producer), which uses hash code of > >> bytes[]. > >> > > > > > > > > >>> > >> > >> > > > > > > > > >>> > >> If deserialization is needed, it has to be > >>done in > >> > > > message > >> > > > > > > > > >>>handler. > >> > > > > > > > > >>> > >> > >> > > > > > > > > >>> > >> Thanks. > >> > > > > > > > > >>> > >> > >> > > > > > > > > >>> > >> Jiangjie (Becket) Qin > >> > > > > > > > > >>> > >> > >> > > > > > > > > >>> > >> On 2/4/15, 11:33 AM, "Bhavesh Mistry" < > >> > > > > > > > mistry.p.bhav...@gmail.com> > >> > > > > > > > > >>> > >>wrote: > >> > > > > > > > > >>> > >> > >> > > > > > > > > >>> > >> >Hi Jiangjie, > >> > > > > > > > > >>> > >> > > >> > > > > > > > > >>> > >> >Thanks for entertaining my question so far. > >>Last > >> > > > > > question, I > >> > > > > > > > > >>>have is > >> > > > > > > > > >>> > >> >about > >> > > > > > > > > >>> > >> >serialization of message key. If the key > >> > > > de-serialization > >> > > > > > > > > >>>(Class) is > >> > > > > > > > > >>> > >>not > >> > > > > > > > > >>> > >> >present at the MM instance, then does it use > >>raw > >> > byte > >> > > > > > hashcode > >> > > > > > > > to > >> > > > > > > > > >>> > >> >determine > >> > > > > > > > > >>> > >> >the partition ? How are you going to address > >>the > >> > > > situation > >> > > > > > > > where > >> > > > > > > > > >>>key > >> > > > > > > > > >>> > >> >needs > >> > > > > > > > > >>> > >> >to be de-serialization and get actual hashcode > >> > needs > >> > > > to be > >> > > > > > > > > >>>computed > >> > > > > > > > > >>> ?. > >> > > > > > > > > >>> > >> > > >> > > > > > > > > >>> > >> > > >> > > > > > > > > >>> > >> >Thanks, > >> > > > > > > > > >>> > >> > > >> > > > > > > > > >>> > >> >Bhavesh > >> > > > > > > > > >>> > >> > > >> > > > > > > > > >>> > >> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin > >> > > > > > > > > >>> > >><j...@linkedin.com.invalid> > >> > > > > > > > > >>> > >> >wrote: > >> > > > > > > > > >>> > >> > > >> > > > > > > > > >>> > >> >> Hi Bhavesh, > >> > > > > > > > > >>> > >> >> > >> > > > > > > > > >>> > >> >> Please see inline comments. > >> > > > > > > > > >>> > >> >> > >> > > > > > > > > >>> > >> >> Jiangjie (Becket) Qin > >> > > > > > > > > >>> > >> >> > >> > > > > > > > > >>> > >> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" > >> > > > > > > > > >>><mistry.p.bhav...@gmail.com> > >> > > > > > > > > >>> > >> >>wrote: > >> > > > > > > > > >>> > >> >> > >> > > > > > > > > >>> > >> >> >Hi Jiangjie, > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> >Thanks for the input. > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> >a) Is MM will producer ack will be attach > >>to > >> > > > Producer > >> > > > > > > > > >>>Instance or > >> > > > > > > > > >>> > >>per > >> > > > > > > > > >>> > >> >> >topic. Use case is that one instance of MM > >> > > > > > > > > >>> > >> >> >needs to handle both strong ack and also > >>ack=0 > >> > for > >> > > > some > >> > > > > > > > topic. > >> > > > > > > > > >>> Or > >> > > > > > > > > >>> > >>it > >> > > > > > > > > >>> > >> >> >would > >> > > > > > > > > >>> > >> >> >be better to set-up another instance of MM. > >> > > > > > > > > >>> > >> >> The acks setting is producer level setting > >> > instead of > >> > > > > > topic > >> > > > > > > > > >>>level > >> > > > > > > > > >>> > >> >>setting. > >> > > > > > > > > >>> > >> >> In this case you probably need to set up > >> another > >> > > > > > instance. > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> >b) Regarding TCP connections, Why does > >> #producer > >> > > > > > instance > >> > > > > > > > > >>>attach > >> > > > > > > > > >>> to > >> > > > > > > > > >>> > >>TCP > >> > > > > > > > > >>> > >> >> >connection. Is it possible to use Broker > >> > > > Connection TCP > >> > > > > > > > Pool, > >> > > > > > > > > >>> > >>producer > >> > > > > > > > > >>> > >> >> >will just checkout TCP connection to > >>Broker. > >> > So, > >> > > > # of > >> > > > > > > > > >>>Producer > >> > > > > > > > > >>> > >> >>Instance > >> > > > > > > > > >>> > >> >> >does not correlation to Brokers Connection. > >> Is > >> > this > >> > > > > > > > possible > >> > > > > > > > > >>>? > >> > > > > > > > > >>> > >> >> In new producer, each producer maintains a > >> > > > connection to > >> > > > > > each > >> > > > > > > > > >>> broker > >> > > > > > > > > >>> > >> >> within the producer instance. Making > >>producer > >> > > > instances > >> > > > > > to > >> > > > > > > > > >>>share > >> > > > > > > > > >>> the > >> > > > > > > > > >>> > >>TCP > >> > > > > > > > > >>> > >> >> connections is a very big change to the > >>current > >> > > > design, > >> > > > > > so I > >> > > > > > > > > >>> suppose > >> > > > > > > > > >>> > >>we > >> > > > > > > > > >>> > >> >> won’t be able to do that. > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> >Thanks, > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> >Bhavesh > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie > >>Qin > >> > > > > > > > > >>> > >> >><j...@linkedin.com.invalid > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> >wrote: > >> > > > > > > > > >>> > >> >> > > >> > > > > > > > > >>> > >> >> >> Hi Bhavesh, > >> > > > > > > > > >>> > >> >> >> > >> > > > > > > > > >>> > >> >> >> I think it is the right discussion to > >>have > >> > when > >> > > > we are > >> > > > > > > > > >>>talking > >> > > > > > > > > >>> > >>about > >> > > > > > > > > >>> > >> >>the > >> > > > > > > > > >>> > >> >> >> new new design for MM. > >> > > > > > > > > >>> > >> >> >> Please see the inline comments. > >> > > > > > > > > >>> > >> >> >> > >> > > > > > > > > >>> > >> >> >> Jiangjie (Becket) Qin > >> > > > > > > > > >>> > >> >> >> > >> > > > > > > > > >>> > >> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" > >> > > > > > > > > >>> > >><mistry.p.bhav...@gmail.com> > >> > > > > > > > > >>> > >> >> >>wrote: > >> > > > > > > > > >>> > >> >> >> > >> > > > > > > > > >>> > >> >> >> >Hi Jiangjie, > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >I just wanted to let you know about our > >>use > >> > case > >> > > > and > >> > > > > > > > stress > >> > > > > > > > > >>>the > >> > > > > > > > > >>> > >> >>point > >> > > > > > > > > >>> > >> >> >>that > >> > > > > > > > > >>> > >> >> >> >local data center broker cluster have > >>fewer > >> > > > > > partitions > >> > > > > > > > than > >> > > > > > > > > >>>the > >> > > > > > > > > >>> > >> >> >> >destination > >> > > > > > > > > >>> > >> >> >> >offline broker cluster. Just because we > >>do > >> > the > >> > > > batch > >> > > > > > pull > >> > > > > > > > > >>>from > >> > > > > > > > > >>> > >>CAMUS > >> > > > > > > > > >>> > >> >> >>and > >> > > > > > > > > >>> > >> >> >> >in > >> > > > > > > > > >>> > >> >> >> >order to drain data faster than the > >> injection > >> > > > rate > >> > > > > > (from > >> > > > > > > > > >>>four > >> > > > > > > > > >>> DCs > >> > > > > > > > > >>> > >> >>for > >> > > > > > > > > >>> > >> >> >>same > >> > > > > > > > > >>> > >> >> >> >topic). > >> > > > > > > > > >>> > >> >> >> Keeping the same partition number in > >>source > >> > and > >> > > > target > >> > > > > > > > > >>>cluster > >> > > > > > > > > >>> > >>will > >> > > > > > > > > >>> > >> >>be > >> > > > > > > > > >>> > >> >> >>an > >> > > > > > > > > >>> > >> >> >> option but will not be enforced by > >>default. > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >We are facing following issues (probably > >> due > >> > to > >> > > > > > > > > >>>configuration): > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >1) We occasionally loose data due > >>to > >> > message > >> > > > > > batch > >> > > > > > > > > >>>size is > >> > > > > > > > > >>> > >>too > >> > > > > > > > > >>> > >> >> >>large > >> > > > > > > > > >>> > >> >> >> >(2MB) on target data (we are using old > >> > producer > >> > > > but I > >> > > > > > > > think > >> > > > > > > > > >>>new > >> > > > > > > > > >>> > >> >> >>producer > >> > > > > > > > > >>> > >> >> >> >will solve this problem to some extend). > >> > > > > > > > > >>> > >> >> >> We do see this issue in LinkedIn as well. > >> New > >> > > > producer > >> > > > > > > > also > >> > > > > > > > > >>> might > >> > > > > > > > > >>> > >> >>have > >> > > > > > > > > >>> > >> >> >> this issue. There are some proposal of > >> > solutions, > >> > > > but > >> > > > > > no > >> > > > > > > > > >>>real > >> > > > > > > > > >>> work > >> > > > > > > > > >>> > >> >> >>started > >> > > > > > > > > >>> > >> >> >> yet. For now, as a workaround, setting a > >> more > >> > > > > > aggressive > >> > > > > > > > > >>>batch > >> > > > > > > > > >>> > >>size > >> > > > > > > > > >>> > >> >>on > >> > > > > > > > > >>> > >> >> >> producer side should work. > >> > > > > > > > > >>> > >> >> >> >2) Since only one instance is set > >>to > >> MM > >> > > > data, > >> > > > > > we > >> > > > > > > > are > >> > > > > > > > > >>>not > >> > > > > > > > > >>> > >>able > >> > > > > > > > > >>> > >> >>to > >> > > > > > > > > >>> > >> >> >> >set-up ack per topic instead ack is > >> attached > >> > to > >> > > > > > producer > >> > > > > > > > > >>> > >>instance. > >> > > > > > > > > >>> > >> >> >> I don’t quite get the question here. > >> > > > > > > > > >>> > >> >> >> >3) How are you going to address two > >> > phase > >> > > > commit > >> > > > > > > > > >>>problem > >> > > > > > > > > >>> if > >> > > > > > > > > >>> > >> >>ack is > >> > > > > > > > > >>> > >> >> >> >set > >> > > > > > > > > >>> > >> >> >> >to strongest, but auto commit is on for > >> > consumer > >> > > > > > (meaning > >> > > > > > > > > >>> > >>producer > >> > > > > > > > > >>> > >> >>does > >> > > > > > > > > >>> > >> >> >> >not > >> > > > > > > > > >>> > >> >> >> >get ack, but consumer auto committed > >> offset > >> > that > >> > > > > > > > message). > >> > > > > > > > > >>> Is > >> > > > > > > > > >>> > >> >>there > >> > > > > > > > > >>> > >> >> >> >transactional (Kafka transaction is in > >> > process) > >> > > > > > based ack > >> > > > > > > > > >>>and > >> > > > > > > > > >>> > >>commit > >> > > > > > > > > >>> > >> >> >> >offset > >> > > > > > > > > >>> > >> >> >> >? > >> > > > > > > > > >>> > >> >> >> Auto offset commit should be turned off > >>in > >> > this > >> > > > case. > >> > > > > > The > >> > > > > > > > > >>>offset > >> > > > > > > > > >>> > >>will > >> > > > > > > > > >>> > >> >> >>only > >> > > > > > > > > >>> > >> >> >> be committed once by the offset commit > >> > thread. So > >> > > > > > there is > >> > > > > > > > > >>>no > >> > > > > > > > > >>> two > >> > > > > > > > > >>> > >> >>phase > >> > > > > > > > > >>> > >> >> >> commit. > >> > > > > > > > > >>> > >> >> >> >4) How are you planning to avoid > >> > duplicated > >> > > > > > message? > >> > > > > > > > > >>>( Is > >> > > > > > > > > >>> > >> >> >> >brokergoing > >> > > > > > > > > >>> > >> >> >> >have moving window of message collected > >>and > >> > > > de-dupe > >> > > > > > ?) > >> > > > > > > > > >>> > >>Possibly, we > >> > > > > > > > > >>> > >> >> >>get > >> > > > > > > > > >>> > >> >> >> >this from retry set to 5…? > >> > > > > > > > > >>> > >> >> >> We are not trying to completely avoid > >> > duplicates. > >> > > > The > >> > > > > > > > > >>>duplicates > >> > > > > > > > > >>> > >>will > >> > > > > > > > > >>> > >> >> >> still be there if: > >> > > > > > > > > >>> > >> >> >> 1. Producer retries on failure. > >> > > > > > > > > >>> > >> >> >> 2. Mirror maker is hard killed. > >> > > > > > > > > >>> > >> >> >> Currently, dedup is expected to be done > >>by > >> > user if > >> > > > > > > > > >>>necessary. > >> > > > > > > > > >>> > >> >> >> >5) Last, is there any warning or > >>any > >> > thing > >> > > > you > >> > > > > > can > >> > > > > > > > > >>>provide > >> > > > > > > > > >>> > >> >>insight > >> > > > > > > > > >>> > >> >> >> >from MM component about data injection > >>rate > >> > into > >> > > > > > > > > >>>destination > >> > > > > > > > > >>> > >> >> >>partitions is > >> > > > > > > > > >>> > >> >> >> >NOT evenly distributed regardless of > >> keyed > >> > or > >> > > > > > non-keyed > >> > > > > > > > > >>> message > >> > > > > > > > > >>> > >> >> >>(Hence > >> > > > > > > > > >>> > >> >> >> >there is ripple effect such as data not > >> > arriving > >> > > > > > late, or > >> > > > > > > > > >>>data > >> > > > > > > > > >>> is > >> > > > > > > > > >>> > >> >> >>arriving > >> > > > > > > > > >>> > >> >> >> >out of order in intern of time stamp > >>and > >> > early > >> > > > some > >> > > > > > > > time, > >> > > > > > > > > >>>and > >> > > > > > > > > >>> > >> >>CAMUS > >> > > > > > > > > >>> > >> >> >> >creates huge number of file count on > >>HDFS > >> > due to > >> > > > > > uneven > >> > > > > > > > > >>> injection > >> > > > > > > > > >>> > >> >>rate > >> > > > > > > > > >>> > >> >> >>. > >> > > > > > > > > >>> > >> >> >> >Camus Job is configured to run every 3 > >> > minutes.) > >> > > > > > > > > >>> > >> >> >> I think uneven data distribution is > >> typically > >> > > > caused > >> > > > > > by > >> > > > > > > > > >>>server > >> > > > > > > > > >>> > >>side > >> > > > > > > > > >>> > >> >> >> unbalance, instead of something mirror > >>maker > >> > could > >> > > > > > > > control. > >> > > > > > > > > >>>In > >> > > > > > > > > >>> new > >> > > > > > > > > >>> > >> >> >>mirror > >> > > > > > > > > >>> > >> >> >> maker, however, there is a customizable > >> > message > >> > > > > > handler, > >> > > > > > > > > >>>that > >> > > > > > > > > >>> > >>might > >> > > > > > > > > >>> > >> >>be > >> > > > > > > > > >>> > >> >> >> able to help a little bit. In message > >> handler, > >> > > > you can > >> > > > > > > > > >>> explicitly > >> > > > > > > > > >>> > >> >>set a > >> > > > > > > > > >>> > >> >> >> partition that you want to produce the > >> message > >> > > > to. So > >> > > > > > if > >> > > > > > > > you > >> > > > > > > > > >>> know > >> > > > > > > > > >>> > >>the > >> > > > > > > > > >>> > >> >> >> uneven data distribution in target > >>cluster, > >> > you > >> > > > may > >> > > > > > offset > >> > > > > > > > > >>>it > >> > > > > > > > > >>> > >>here. > >> > > > > > > > > >>> > >> >>But > >> > > > > > > > > >>> > >> >> >> that probably only works for non-keyed > >> > messages. > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >I am not sure if this is right > >>discussion > >> > form to > >> > > > > > bring > >> > > > > > > > > >>>these > >> > > > > > > > > >>> to > >> > > > > > > > > >>> > >> >> >> >your/kafka > >> > > > > > > > > >>> > >> >> >> >Dev team attention. This might be off > >> track, > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >Thanks, > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >Bhavesh > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >On Wed, Jan 28, 2015 at 11:07 AM, > >>Jiangjie > >> > Qin > >> > > > > > > > > >>> > >> >> >><j...@linkedin.com.invalid > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >wrote: > >> > > > > > > > > >>> > >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> I’ve updated the KIP page. Feedbacks > >>are > >> > > > welcome. > >> > > > > > > > > >>> > >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> Regarding the simple mirror maker > >> design. I > >> > > > thought > >> > > > > > > > over > >> > > > > > > > > >>>it > >> > > > > > > > > >>> and > >> > > > > > > > > >>> > >> >>have > >> > > > > > > > > >>> > >> >> >> >>some > >> > > > > > > > > >>> > >> >> >> >> worries: > >> > > > > > > > > >>> > >> >> >> >> There are two things that might worth > >> > thinking: > >> > > > > > > > > >>> > >> >> >> >> 1. One of the enhancement to mirror > >>maker > >> > is > >> > > > > > adding a > >> > > > > > > > > >>>message > >> > > > > > > > > >>> > >> >> >>handler to > >> > > > > > > > > >>> > >> >> >> >> do things like reformatting. I think > >>we > >> > might > >> > > > > > > > potentially > >> > > > > > > > > >>> want > >> > > > > > > > > >>> > >>to > >> > > > > > > > > >>> > >> >> >>have > >> > > > > > > > > >>> > >> >> >> >> more threads processing the messages > >>than > >> > the > >> > > > > > number of > >> > > > > > > > > >>> > >>consumers. > >> > > > > > > > > >>> > >> >> >>If we > >> > > > > > > > > >>> > >> >> >> >> follow the simple mirror maker > >>solution, > >> we > >> > > > lose > >> > > > > > this > >> > > > > > > > > >>> > >>flexibility. > >> > > > > > > > > >>> > >> >> >> >> 2. This might not matter too much, but > >> > creating > >> > > > > > more > >> > > > > > > > > >>> consumers > >> > > > > > > > > >>> > >> >>means > >> > > > > > > > > >>> > >> >> >> >>more > >> > > > > > > > > >>> > >> >> >> >> footprint of TCP connection / memory. > >> > > > > > > > > >>> > >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> Any thoughts on this? > >> > > > > > > > > >>> > >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> Thanks. > >> > > > > > > > > >>> > >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> Jiangjie (Becket) Qin > >> > > > > > > > > >>> > >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" < > >> > > > > > > > j...@linkedin.com> > >> > > > > > > > > >>> > wrote: > >> > > > > > > > > >>> > >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> >Hi Jay and Neha, > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> >Thanks a lot for the reply and > >> > explanation. I > >> > > > do > >> > > > > > agree > >> > > > > > > > > >>>it > >> > > > > > > > > >>> > >>makes > >> > > > > > > > > >>> > >> >>more > >> > > > > > > > > >>> > >> >> >> >>sense > >> > > > > > > > > >>> > >> >> >> >> >to avoid duplicate effort and plan > >>based > >> > on > >> > > > new > >> > > > > > > > > >>>consumer. > >> > > > > > > > > >>> I’ll > >> > > > > > > > > >>> > >> >> >>modify > >> > > > > > > > > >>> > >> >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >KIP. > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> >To Jay’s question on message > >>ordering - > >> > The > >> > > > data > >> > > > > > > > channel > >> > > > > > > > > >>> > >> >>selection > >> > > > > > > > > >>> > >> >> >> >>makes > >> > > > > > > > > >>> > >> >> >> >> >sure that the messages from the same > >> > source > >> > > > > > partition > >> > > > > > > > > >>>will > >> > > > > > > > > >>> > >>sent > >> > > > > > > > > >>> > >> >>by > >> > > > > > > > > >>> > >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >same producer. So the order of the > >> > messages is > >> > > > > > > > > >>>guaranteed > >> > > > > > > > > >>> with > >> > > > > > > > > >>> > >> >> >>proper > >> > > > > > > > > >>> > >> >> >> >> >producer settings > >> > > > > > > > > >>> > >> > >>>>(MaxInFlightRequests=1,retries=Integer.MaxValue, > >> > > > > > > > > >>> > >> >> >> >>etc.) > >> > > > > > > > > >>> > >> >> >> >> >For keyed messages, because they come > >> > from the > >> > > > > > same > >> > > > > > > > > >>>source > >> > > > > > > > > >>> > >> >>partition > >> > > > > > > > > >>> > >> >> >> >>and > >> > > > > > > > > >>> > >> >> >> >> >will end up in the same target > >> partition, > >> > as > >> > > > long > >> > > > > > as > >> > > > > > > > > >>>they > >> > > > > > > > > >>> are > >> > > > > > > > > >>> > >> >>sent > >> > > > > > > > > >>> > >> >> >>by > >> > > > > > > > > >>> > >> >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >same producer, the order is > >>guaranteed. > >> > > > > > > > > >>> > >> >> >> >> >For non-keyed messages, the messages > >> > coming > >> > > > from > >> > > > > > the > >> > > > > > > > > >>>same > >> > > > > > > > > >>> > >>source > >> > > > > > > > > >>> > >> >> >> >>partition > >> > > > > > > > > >>> > >> >> >> >> >might go to different target > >>partitions. > >> > The > >> > > > > > order is > >> > > > > > > > > >>>only > >> > > > > > > > > >>> > >> >> >>guaranteed > >> > > > > > > > > >>> > >> >> >> >> >within each partition. > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> >Anyway, I’ll modify the KIP and data > >> > channel > >> > > > will > >> > > > > > be > >> > > > > > > > > >>>away. > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> >Thanks. > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> >Jiangjie (Becket) Qin > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" > >>< > >> > > > > > > > n...@confluent.io> > >> > > > > > > > > >>> > >>wrote: > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> >>I think there is some value in > >> > investigating > >> > > > if > >> > > > > > we > >> > > > > > > > can > >> > > > > > > > > >>>go > >> > > > > > > > > >>> > >>back > >> > > > > > > > > >>> > >> >>to > >> > > > > > > > > >>> > >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >>simple mirror maker design, as Jay > >> points > >> > > > out. > >> > > > > > Here > >> > > > > > > > you > >> > > > > > > > > >>> have > >> > > > > > > > > >>> > >>N > >> > > > > > > > > >>> > >> >> >> >>threads, > >> > > > > > > > > >>> > >> >> >> >> >>each has a consumer and a producer. > >> > > > > > > > > >>> > >> >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> >>The reason why we had to move away > >>from > >> > that > >> > > > was > >> > > > > > a > >> > > > > > > > > >>> > >>combination > >> > > > > > > > > >>> > >> >>of > >> > > > > > > > > >>> > >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >>difference in throughput between the > >> > consumer > >> > > > > > and the > >> > > > > > > > > >>>old > >> > > > > > > > > >>> > >> >>producer > >> > > > > > > > > >>> > >> >> >>and > >> > > > > > > > > >>> > >> >> >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >>deficiency of the consumer > >>rebalancing > >> > that > >> > > > > > limits > >> > > > > > > > the > >> > > > > > > > > >>> total > >> > > > > > > > > >>> > >> >> >>number of > >> > > > > > > > > >>> > >> >> >> >> >>mirror maker threads. So the only > >> option > >> > > > > > available > >> > > > > > > > was > >> > > > > > > > > >>>to > >> > > > > > > > > >>> > >> >>increase > >> > > > > > > > > >>> > >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >>throughput of the limited # of > >>mirror > >> > maker > >> > > > > > threads > >> > > > > > > > > >>>that > >> > > > > > > > > >>> > >>could > >> > > > > > > > > >>> > >> >>be > >> > > > > > > > > >>> > >> >> >> >> >>deployed. > >> > > > > > > > > >>> > >> >> >> >> >>Now that queuing design may not make > >> > sense, > >> > > > if > >> > > > > > the > >> > > > > > > > new > >> > > > > > > > > >>> > >> >>producer's > >> > > > > > > > > >>> > >> >> >> >> >>throughput is almost similar to the > >> > consumer > >> > > > AND > >> > > > > > the > >> > > > > > > > > >>>fact > >> > > > > > > > > >>> > >>that > >> > > > > > > > > >>> > >> >>the > >> > > > > > > > > >>> > >> >> >>new > >> > > > > > > > > >>> > >> >> >> >> >>round-robin based consumer > >>rebalancing > >> > can > >> > > > allow > >> > > > > > a > >> > > > > > > > very > >> > > > > > > > > >>> high > >> > > > > > > > > >>> > >> >> >>number of > >> > > > > > > > > >>> > >> >> >> >> >>mirror maker instances to exist. > >> > > > > > > > > >>> > >> >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> >>This is the end state that the > >>mirror > >> > maker > >> > > > > > should be > >> > > > > > > > > >>>in > >> > > > > > > > > >>> once > >> > > > > > > > > >>> > >> >>the > >> > > > > > > > > >>> > >> >> >>new > >> > > > > > > > > >>> > >> >> >> >> >>consumer is complete, so it wouldn't > >> > hurt to > >> > > > see > >> > > > > > if > >> > > > > > > > we > >> > > > > > > > > >>>can > >> > > > > > > > > >>> > >>just > >> > > > > > > > > >>> > >> >> >>move > >> > > > > > > > > >>> > >> >> >> >>to > >> > > > > > > > > >>> > >> >> >> >> >>that right now. > >> > > > > > > > > >>> > >> >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay > >> > Kreps > >> > > > > > > > > >>> > >><jay.kr...@gmail.com > >> > > > > > > > > >>> > >> > > >> > > > > > > > > >>> > >> >> >> >>wrote: > >> > > > > > > > > >>> > >> >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> >>> QQ: If we ever use a different > >> > technique > >> > > > for > >> > > > > > the > >> > > > > > > > data > >> > > > > > > > > >>> > >>channel > >> > > > > > > > > >>> > >> >> >> >>selection > >> > > > > > > > > >>> > >> >> >> >> >>> than for the producer partitioning > >> > won't > >> > > > that > >> > > > > > break > >> > > > > > > > > >>> > >>ordering? > >> > > > > > > > > >>> > >> >>How > >> > > > > > > > > >>> > >> >> >> >>can > >> > > > > > > > > >>> > >> >> >> >> >>>we > >> > > > > > > > > >>> > >> >> >> >> >>> ensure these things stay in sync? > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > > > >>> > >> >> >> >> >>> With respect to the new > >>consumer--I > >> > really > >> > > > do > >> > > > > > want > >> > > > > > > > to > >> > > > > > > > > >>> > >> >>encourage > >> > > > > > > > > >>> > >> >> >> >>people > >> > > > > > > > > >>> > >> >> >> >> >>>to > >> > > > > > > > > >>> > >> >> >> >> >>> think through how MM will work > >>with > >> > the new > >> > > > > > > > consumer. > >> > > > > > > > > >>>I > >> > > > > > > > > >>> > >>mean > >> > > > > > > > > >>> > >> >>this > >> > > > > > > > > >>> > >> >> >> >>isn't > >> > > > > > > > > >>> > >> >> >> >> >>> very far off, maybe a few months > >>if > >> we > >> > > > hustle? > >> > > > > > I > >> > > > > > > > > >>>could > >> > > > > > > > > >>> > >> >>imagine us > >> > > > > > > > > >>> > >> >> >> >> >>>getting > >> > > > > > > > > >>> > >> >> >> >> >>> this mm fix done maybe sooner, > >>maybe > >> > in a > >> > > > > > month? > >> > > > > > > > So I > >> > > > > > > > > >>> guess > >> > > > > > > > > >>> > >> >>this > >> > > > > > > > > >>> > >> >> >> >>buys > >> > > > > > > > > >>> > >> >> >> >> >>>us an > >> > > > > > > > > >>> > >> >> >> >> >>> extra month before we rip it out > >>and > >> > throw > >> > > > it > >> > > > > > away? > >> > > > > > > > > >>>Maybe > >> > > > > > > > > >>> > >>two? > >> > > > > > > > > >>> > >> >> >>This > >> > > > > > > > > >>> > >> >> >> >>bug > >> > > > > > > > > >>> > >> >> >> >> >>>has > >> > > > > > > > > >>> > >> >> >> >> >>> been there for a while, though, > >> right? > >> > Is > >> > > > it > >> > > > > > worth > >> > > > > > > > > >>>it? > >> > > > > > > > > >>> > >> >>Probably > >> > > > > > > > > >>> > >> >> >>it > >> > > > > > > > > >>> > >> >> >> >>is, > >> > > > > > > > > >>> > >> >> >> >> >>>but > >> > > > > > > > > >>> > >> >> >> >> >>> it still kind of sucks to have the > >> > > > duplicate > >> > > > > > > > effort. > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > > > >>> > >> >> >> >> >>> So anyhow let's definitely think > >> about > >> > how > >> > > > > > things > >> > > > > > > > > >>>will > >> > > > > > > > > >>> work > >> > > > > > > > > >>> > >> >>with > >> > > > > > > > > >>> > >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >>>new > >> > > > > > > > > >>> > >> >> >> >> >>> consumer. I think we can probably > >> just > >> > > > have N > >> > > > > > > > > >>>threads, > >> > > > > > > > > >>> each > >> > > > > > > > > >>> > >> >> >>thread > >> > > > > > > > > >>> > >> >> >> >>has > >> > > > > > > > > >>> > >> >> >> >> >>>a > >> > > > > > > > > >>> > >> >> >> >> >>> producer and consumer and is > >> internally > >> > > > single > >> > > > > > > > > >>>threaded. > >> > > > > > > > > >>> > >>Any > >> > > > > > > > > >>> > >> >> >>reason > >> > > > > > > > > >>> > >> >> >> >> >>>this > >> > > > > > > > > >>> > >> >> >> >> >>> wouldn't work? > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > > > >>> > >> >> >> >> >>> -Jay > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > > > >>> > >> >> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, > >> > Jiangjie > >> > > > Qin > >> > > > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid> > >> > > > > > > > > >>> > >> >> >> >> >>> wrote: > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > > > >>> > >> >> >> >> >>> > Hi Jay, > >> > > > > > > > > >>> > >> >> >> >> >>> > > >> > > > > > > > > >>> > >> >> >> >> >>> > Thanks for comments. Please see > >> > inline > >> > > > > > responses. > >> > > > > > > > > >>> > >> >> >> >> >>> > > >> > > > > > > > > >>> > >> >> >> >> >>> > Jiangjie (Becket) Qin > >> > > > > > > > > >>> > >> >> >> >> >>> > > >> > > > > > > > > >>> > >> >> >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" > >> > > > > > > > > >>><jay.kr...@gmail.com> > >> > > > > > > > > >>> > >> >>wrote: > >> > > > > > > > > >>> > >> >> >> >> >>> > > >> > > > > > > > > >>> > >> >> >> >> >>> > >Hey guys, > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >A couple questions/comments: > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >1. The callback and > >> user-controlled > >> > > > commit > >> > > > > > > > offset > >> > > > > > > > > >>> > >> >> >>functionality > >> > > > > > > > > >>> > >> >> >> >>is > >> > > > > > > > > >>> > >> >> >> >> >>> already > >> > > > > > > > > >>> > >> >> >> >> >>> > >in the new consumer which we > >>are > >> > > > working on > >> > > > > > in > >> > > > > > > > > >>> parallel. > >> > > > > > > > > >>> > >> >>If we > >> > > > > > > > > >>> > >> >> >> >> >>> accelerated > >> > > > > > > > > >>> > >> >> >> >> >>> > >that work it might help > >> concentrate > >> > > > > > efforts. I > >> > > > > > > > > >>>admit > >> > > > > > > > > >>> > >>this > >> > > > > > > > > >>> > >> >> >>might > >> > > > > > > > > >>> > >> >> >> >>take > >> > > > > > > > > >>> > >> >> >> >> >>> > >slightly longer in calendar > >>time > >> but > >> > > > could > >> > > > > > still > >> > > > > > > > > >>> > >>probably > >> > > > > > > > > >>> > >> >>get > >> > > > > > > > > >>> > >> >> >> >>done > >> > > > > > > > > >>> > >> >> >> >> >>>this > >> > > > > > > > > >>> > >> >> >> >> >>> > >quarter. Have you guys > >>considered > >> > that > >> > > > > > approach? > >> > > > > > > > > >>> > >> >> >> >> >>> > Yes, I totally agree that > >>ideally > >> we > >> > > > should > >> > > > > > put > >> > > > > > > > > >>>efforts > >> > > > > > > > > >>> > >>on > >> > > > > > > > > >>> > >> >>new > >> > > > > > > > > >>> > >> >> >> >> >>>consumer. > >> > > > > > > > > >>> > >> >> >> >> >>> > The main reason for still > >>working > >> on > >> > the > >> > > > old > >> > > > > > > > > >>>consumer > >> > > > > > > > > >>> is > >> > > > > > > > > >>> > >> >>that > >> > > > > > > > > >>> > >> >> >>we > >> > > > > > > > > >>> > >> >> >> >> >>>expect > >> > > > > > > > > >>> > >> >> >> >> >>> it > >> > > > > > > > > >>> > >> >> >> >> >>> > would still be used in LinkedIn > >>for > >> > > > quite a > >> > > > > > while > >> > > > > > > > > >>> before > >> > > > > > > > > >>> > >>the > >> > > > > > > > > >>> > >> >> >>new > >> > > > > > > > > >>> > >> >> >> >> >>>consumer > >> > > > > > > > > >>> > >> >> >> >> >>> > could be fully rolled out. And > >>we > >> > > > recently > >> > > > > > > > > >>>suffering a > >> > > > > > > > > >>> > >>lot > >> > > > > > > > > >>> > >> >>from > >> > > > > > > > > >>> > >> >> >> >> >>>mirror > >> > > > > > > > > >>> > >> >> >> >> >>> > maker data loss issue. So our > >> current > >> > > > plan is > >> > > > > > > > > >>>making > >> > > > > > > > > >>> > >> >>necessary > >> > > > > > > > > >>> > >> >> >> >> >>>changes to > >> > > > > > > > > >>> > >> >> >> >> >>> > make current mirror maker > >>stable in > >> > > > > > production. > >> > > > > > > > > >>>Then we > >> > > > > > > > > >>> > >>can > >> > > > > > > > > >>> > >> >> >>test > >> > > > > > > > > >>> > >> >> >> >>and > >> > > > > > > > > >>> > >> >> >> >> >>> > rollout new consumer gradually > >> > without > >> > > > > > getting > >> > > > > > > > > >>>burnt. > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >2. I think partitioning on the > >> hash > >> > of > >> > > > the > >> > > > > > topic > >> > > > > > > > > >>> > >>partition > >> > > > > > > > > >>> > >> >>is > >> > > > > > > > > >>> > >> >> >> >>not a > >> > > > > > > > > >>> > >> >> >> >> >>>very > >> > > > > > > > > >>> > >> >> >> >> >>> > >good idea because that will > >>make > >> the > >> > > > case of > >> > > > > > > > going > >> > > > > > > > > >>> from > >> > > > > > > > > >>> > >>a > >> > > > > > > > > >>> > >> >> >>cluster > >> > > > > > > > > >>> > >> >> >> >> >>>with > >> > > > > > > > > >>> > >> >> >> >> >>> > >fewer partitions to one with > >>more > >> > > > > > partitions not > >> > > > > > > > > >>> work. I > >> > > > > > > > > >>> > >> >> >>think an > >> > > > > > > > > >>> > >> >> >> >> >>> > >intuitive > >> > > > > > > > > >>> > >> >> >> >> >>> > >way to do this would be the > >> > following: > >> > > > > > > > > >>> > >> >> >> >> >>> > >a. Default behavior: Just do > >>what > >> > the > >> > > > > > producer > >> > > > > > > > > >>>does. > >> > > > > > > > > >>> > >>I.e. > >> > > > > > > > > >>> > >> >>if > >> > > > > > > > > >>> > >> >> >>you > >> > > > > > > > > >>> > >> >> >> >> >>> specify a > >> > > > > > > > > >>> > >> >> >> >> >>> > >key use it for partitioning, if > >> not > >> > just > >> > > > > > > > partition > >> > > > > > > > > >>>in > >> > > > > > > > > >>> a > >> > > > > > > > > >>> > >> >> >> >>round-robin > >> > > > > > > > > >>> > >> >> >> >> >>> > >fashion. > >> > > > > > > > > >>> > >> >> >> >> >>> > >b. Add a --preserve-partition > >> option > >> > > > that > >> > > > > > will > >> > > > > > > > > >>> > >>explicitly > >> > > > > > > > > >>> > >> >> >> >>inherent > >> > > > > > > > > >>> > >> >> >> >> >>>the > >> > > > > > > > > >>> > >> >> >> >> >>> > >partition from the source > >> > irrespective > >> > > > of > >> > > > > > > > whether > >> > > > > > > > > >>> there > >> > > > > > > > > >>> > >>is > >> > > > > > > > > >>> > >> >>a > >> > > > > > > > > >>> > >> >> >>key > >> > > > > > > > > >>> > >> >> >> >>or > >> > > > > > > > > >>> > >> >> >> >> >>> which > >> > > > > > > > > >>> > >> >> >> >> >>> > >partition that key would hash > >>to. > >> > > > > > > > > >>> > >> >> >> >> >>> > Sorry that I did not explain > >>this > >> > clear > >> > > > > > enough. > >> > > > > > > > The > >> > > > > > > > > >>> hash > >> > > > > > > > > >>> > >>of > >> > > > > > > > > >>> > >> >> >>topic > >> > > > > > > > > >>> > >> >> >> >> >>> > partition is only used when > >>decide > >> > which > >> > > > > > mirror > >> > > > > > > > > >>>maker > >> > > > > > > > > >>> > >>data > >> > > > > > > > > >>> > >> >> >>channel > >> > > > > > > > > >>> > >> >> >> >> >>>queue > >> > > > > > > > > >>> > >> >> >> >> >>> > the consumer thread should put > >> > message > >> > > > into. > >> > > > > > It > >> > > > > > > > > >>>only > >> > > > > > > > > >>> > >>tries > >> > > > > > > > > >>> > >> >>to > >> > > > > > > > > >>> > >> >> >>make > >> > > > > > > > > >>> > >> >> >> >> >>>sure > >> > > > > > > > > >>> > >> >> >> >> >>> > the messages from the same > >> partition > >> > is > >> > > > sent > >> > > > > > by > >> > > > > > > > the > >> > > > > > > > > >>> same > >> > > > > > > > > >>> > >> >> >>producer > >> > > > > > > > > >>> > >> >> >> >> >>>thread > >> > > > > > > > > >>> > >> >> >> >> >>> > to guarantee the sending order. > >> This > >> > is > >> > > > not > >> > > > > > at > >> > > > > > > > all > >> > > > > > > > > >>> > >>related > >> > > > > > > > > >>> > >> >>to > >> > > > > > > > > >>> > >> >> >> >>which > >> > > > > > > > > >>> > >> >> >> >> >>> > partition in target cluster the > >> > messages > >> > > > end > >> > > > > > up. > >> > > > > > > > > >>>That > >> > > > > > > > > >>> is > >> > > > > > > > > >>> > >> >>still > >> > > > > > > > > >>> > >> >> >> >> >>>decided by > >> > > > > > > > > >>> > >> >> >> >> >>> > producer. > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >3. You don't actually give the > >> > > > > > > > > >>> ConsumerRebalanceListener > >> > > > > > > > > >>> > >> >> >> >>interface. > >> > > > > > > > > >>> > >> >> >> >> >>>What > >> > > > > > > > > >>> > >> >> >> >> >>> > >is > >> > > > > > > > > >>> > >> >> >> >> >>> > >that going to look like? > >> > > > > > > > > >>> > >> >> >> >> >>> > Good point! I should have put > >>it in > >> > the > >> > > > > > wiki. I > >> > > > > > > > > >>>just > >> > > > > > > > > >>> > >>added > >> > > > > > > > > >>> > >> >>it. > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >4. What is MirrorMakerRecord? I > >> > think > >> > > > > > ideally > >> > > > > > > > the > >> > > > > > > > > >>> > >> >> >> >> >>> > >MirrorMakerMessageHandler > >> > > > > > > > > >>> > >> >> >> >> >>> > >interface would take a > >> > ConsumerRecord as > >> > > > > > input > >> > > > > > > > and > >> > > > > > > > > >>> > >>return a > >> > > > > > > > > >>> > >> >> >> >> >>> > >ProducerRecord, > >> > > > > > > > > >>> > >> >> >> >> >>> > >right? That would allow you to > >> > > > transform the > >> > > > > > > > key, > >> > > > > > > > > >>> value, > >> > > > > > > > > >>> > >> >> >> >>partition, > >> > > > > > > > > >>> > >> >> >> >> >>>or > >> > > > > > > > > >>> > >> >> >> >> >>> > >destination topic... > >> > > > > > > > > >>> > >> >> >> >> >>> > MirrorMakerRecord is introduced > >>in > >> > > > > > KAFKA-1650, > >> > > > > > > > > >>>which is > >> > > > > > > > > >>> > >> >>exactly > >> > > > > > > > > >>> > >> >> >> >>the > >> > > > > > > > > >>> > >> >> >> >> >>>same > >> > > > > > > > > >>> > >> >> >> >> >>> > as ConsumerRecord in KAFKA-1760. > >> > > > > > > > > >>> > >> >> >> >> >>> > private[kafka] class > >> > MirrorMakerRecord > >> > > > (val > >> > > > > > > > > >>> sourceTopic: > >> > > > > > > > > >>> > >> >> >>String, > >> > > > > > > > > >>> > >> >> >> >> >>> > val sourcePartition: Int, > >> > > > > > > > > >>> > >> >> >> >> >>> > val sourceOffset: Long, > >> > > > > > > > > >>> > >> >> >> >> >>> > val key: Array[Byte], > >> > > > > > > > > >>> > >> >> >> >> >>> > val value: Array[Byte]) { > >> > > > > > > > > >>> > >> >> >> >> >>> > def size = value.length + {if > >> (key > >> > == > >> > > > > > null) 0 > >> > > > > > > > > >>>else > >> > > > > > > > > >>> > >> >> >>key.length} > >> > > > > > > > > >>> > >> >> >> >> >>> > } > >> > > > > > > > > >>> > >> >> >> >> >>> > > >> > > > > > > > > >>> > >> >> >> >> >>> > However, because source > >>partition > >> and > >> > > > offset > >> > > > > > is > >> > > > > > > > > >>>needed > >> > > > > > > > > >>> in > >> > > > > > > > > >>> > >> >> >>producer > >> > > > > > > > > >>> > >> >> >> >> >>>thread > >> > > > > > > > > >>> > >> >> >> >> >>> > for consumer offsets > >>bookkeeping, > >> the > >> > > > record > >> > > > > > > > > >>>returned > >> > > > > > > > > >>> by > >> > > > > > > > > >>> > >> >> >> >> >>> > MirrorMakerMessageHandler needs > >>to > >> > > > contain > >> > > > > > those > >> > > > > > > > > >>> > >> >>information. > >> > > > > > > > > >>> > >> >> >> >> >>>Therefore > >> > > > > > > > > >>> > >> >> >> >> >>> > ProducerRecord does not work > >>here. > >> We > >> > > > could > >> > > > > > > > > >>>probably > >> > > > > > > > > >>> let > >> > > > > > > > > >>> > >> >> >>message > >> > > > > > > > > >>> > >> >> >> >> >>>handler > >> > > > > > > > > >>> > >> >> >> >> >>> > take ConsumerRecord for both > >>input > >> > and > >> > > > > > output. > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >5. Have you guys thought about > >> what > >> > the > >> > > > > > > > > >>>implementation > >> > > > > > > > > >>> > >>will > >> > > > > > > > > >>> > >> >> >>look > >> > > > > > > > > >>> > >> >> >> >> >>>like in > >> > > > > > > > > >>> > >> >> >> >> >>> > >terms of threading architecture > >> etc > >> > with > >> > > > > > the new > >> > > > > > > > > >>> > >>consumer? > >> > > > > > > > > >>> > >> >> >>That > >> > > > > > > > > >>> > >> >> >> >>will > >> > > > > > > > > >>> > >> >> >> >> >>>be > >> > > > > > > > > >>> > >> >> >> >> >>> > >soon so even if we aren't > >>starting > >> > with > >> > > > that > >> > > > > > > > let's > >> > > > > > > > > >>> make > >> > > > > > > > > >>> > >> >>sure > >> > > > > > > > > >>> > >> >> >>we > >> > > > > > > > > >>> > >> >> >> >>can > >> > > > > > > > > >>> > >> >> >> >> >>>get > >> > > > > > > > > >>> > >> >> >> >> >>> > >rid > >> > > > > > > > > >>> > >> >> >> >> >>> > >of a lot of the current mirror > >> maker > >> > > > > > accidental > >> > > > > > > > > >>> > >>complexity > >> > > > > > > > > >>> > >> >>in > >> > > > > > > > > >>> > >> >> >> >>terms > >> > > > > > > > > >>> > >> >> >> >> >>>of > >> > > > > > > > > >>> > >> >> >> >> >>> > >threads and queues when we > >>move to > >> > that. > >> > > > > > > > > >>> > >> >> >> >> >>> > I haven¹t thought about it > >> > throughly. The > >> > > > > > quick > >> > > > > > > > > >>>idea is > >> > > > > > > > > >>> > >> >>after > >> > > > > > > > > >>> > >> >> >> >> >>>migration > >> > > > > > > > > >>> > >> >> >> >> >>> to > >> > > > > > > > > >>> > >> >> >> >> >>> > the new consumer, it is probably > >> > better > >> > > > to > >> > > > > > use a > >> > > > > > > > > >>>single > >> > > > > > > > > >>> > >> >> >>consumer > >> > > > > > > > > >>> > >> >> >> >> >>>thread. > >> > > > > > > > > >>> > >> >> >> >> >>> > If multithread is needed, > >> decoupling > >> > > > > > consumption > >> > > > > > > > > >>>and > >> > > > > > > > > >>> > >> >>processing > >> > > > > > > > > >>> > >> >> >> >>might > >> > > > > > > > > >>> > >> >> >> >> >>>be > >> > > > > > > > > >>> > >> >> >> >> >>> > used. MirrorMaker definitely > >>needs > >> > to be > >> > > > > > changed > >> > > > > > > > > >>>after > >> > > > > > > > > >>> > >>new > >> > > > > > > > > >>> > >> >> >> >>consumer > >> > > > > > > > > >>> > >> >> >> >> >>>get > >> > > > > > > > > >>> > >> >> >> >> >>> > checked in. I¹ll document the > >> changes > >> > > > and can > >> > > > > > > > > >>>submit > >> > > > > > > > > >>> > >>follow > >> > > > > > > > > >>> > >> >>up > >> > > > > > > > > >>> > >> >> >> >> >>>patches > >> > > > > > > > > >>> > >> >> >> >> >>> > after the new consumer is > >> available. > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >-Jay > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 > >>PM, > >> > > > Jiangjie > >> > > > > > Qin > >> > > > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid > >> > > > > > > > > >>> > >> >> >> >> >>> > > >> > > > > > > > > >>> > >> >> >> >> >>> > >wrote: > >> > > > > > > > > >>> > >> >> >> >> >>> > > > >> > > > > > > > > >>> > >> >> >> >> >>> > >> Hi Kafka Devs, > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > >> > > > > > > > > >>> > >> >> >> >> >>> > >> We are working on Kafka > >>Mirror > >> > Maker > >> > > > > > > > > >>>enhancement. A > >> > > > > > > > > >>> > >>KIP > >> > > > > > > > > >>> > >> >>is > >> > > > > > > > > >>> > >> >> >> >>posted > >> > > > > > > > > >>> > >> >> >> >> >>>to > >> > > > > > > > > >>> > >> >> >> >> >>> > >> document and discuss on the > >> > > > followings: > >> > > > > > > > > >>> > >> >> >> >> >>> > >> 1. KAFKA-1650: No Data loss > >> mirror > >> > > > maker > >> > > > > > > > change > >> > > > > > > > > >>> > >> >> >> >> >>> > >> 2. KAFKA-1839: To allow > >> partition > >> > > > aware > >> > > > > > > > mirror. > >> > > > > > > > > >>> > >> >> >> >> >>> > >> 3. KAFKA-1840: To allow > >>message > >> > > > > > > > filtering/format > >> > > > > > > > > >>> > >> >>conversion > >> > > > > > > > > >>> > >> >> >> >> >>> > >> Feedbacks are welcome. Please > >> let > >> > us > >> > > > know > >> > > > > > if > >> > > > > > > > you > >> > > > > > > > > >>> have > >> > > > > > > > > >>> > >>any > >> > > > > > > > > >>> > >> >> >> >> >>>questions or > >> > > > > > > > > >>> > >> >> >> >> >>> > >> concerns. > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > >> > > > > > > > > >>> > >> >> >> >> >>> > >> Thanks. > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > >> > > > > > > > > >>> > >> >> >> >> >>> > >> Jiangjie (Becket) Qin > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > >> > > > > > > > > >>> > >> >> >> >> >>> > > >> > > > > > > > > >>> > >> >> >> >> >>> > > >> > > > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > > > >>> > >> >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> >>-- > >> > > > > > > > > >>> > >> >> >> >> >>Thanks, > >> > > > > > > > > >>> > >> >> >> >> >>Neha > >> > > > > > > > > >>> > >> >> >> >> > > >> > > > > > > > > >>> > >> >> >> >> > >> > > > > > > > > >>> > >> >> >> >> > >> > > > > > > > > >>> > >> >> >> > >> > > > > > > > > >>> > >> >> >> > >> > > > > > > > > >>> > >> >> > >> > > > > > > > > >>> > >> >> > >> > > > > > > > > >>> > >> > >> > > > > > > > > >>> > >> > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > >-- > >> > > > > > > > > >>> > >Thanks, > >> > > > > > > > > >>> > >Neha > >> > > > > > > > > >>> > > >> > > > > > > > > >>> > > >> > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > > >>> -- > >> > > > > > > > > >>> Thanks, > >> > > > > > > > > >>> Neha > >> > > > > > > > > >>> > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > -- > >> > > > > > > Thanks, > >> > > > > > > Neha > >> > > > > > > >> > > > > > > >> > > > > >> > > > > >> > > >> > > >> > > > > > > > >-- > >Thanks, > >Neha > >