Sure! Just created the voting thread :) On 2/24/15, 4:44 PM, "Jay Kreps" <j...@confluent.io> wrote:
>Hey Jiangjie, > >Let's do an official vote so that we know what we are voting on and we are >crisp on what the outcome was. This thread is very long :- > >-Jay > >On Tue, Feb 24, 2015 at 2:53 PM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> I updated the KIP page based on the discussion we had. >> >> Should I launch another vote or we can think of this mail thread has >> already included a vote? >> >> Jiangjie (Becket) Qin >> >> On 2/11/15, 5:15 PM, "Neha Nakhede" <n...@confluent.io> wrote: >> >> >Thanks for the explanation, Joel! Would love to see the results of the >> >throughput experiment and I'm a +1 on everything else, ncluding the >> >rebalance callback and record handler. >> > >> >-Neha >> > >> >On Wed, Feb 11, 2015 at 1:13 PM Jay Kreps <jay.kr...@gmail.com> wrote: >> > >> >> Cool, I agree with all that. >> >> >> >> I agree about the need for a rebalancing callback. >> >> >> >> Totally agree about record handler. >> >> >> >> It would be great to see if a prototype of this is workable. >> >> >> >> Thanks guys! >> >> >> >> -Jay >> >> >> >> On Wed, Feb 11, 2015 at 12:36 PM, Joel Koshy <jjkosh...@gmail.com> >> >>wrote: >> >> >> >> > Hey Jay, >> >> > >> >> > Guozhang, Becket and I got together to discuss this and we think: >> >> > >> >> > - It seems that your proposal based on the new consumr and flush >>call >> >> > should work. >> >> > - We would likely need to call the poll with a timeout that matches >> >> > the offset commit interval in order to deal with low volume >> >> > mirroring pipelines. >> >> > - We will still need a rebalance callback to reduce duplicates - >>the >> >> > rebalance callback would need to flush and commit offsets. >> >> > - The only remaining question is if the overall throughput is >> >> > sufficient. I think someone at LinkedIn (I don't remember who) >>did >> >> > some experiments with data channel size == 1 and ran into issues. >> >> > That was not thoroughly investigated though. >> >> > - The addition of flush may actually make this solution viable for >>the >> >> > current mirror-maker (wih the old consumer). We can prototype >>that >> >> > offline and if it works out well we can redo KAFKA-1650 (i.e., >> >> > refactor the current mirror maker). The flush call and the new >> >> > consumer didn't exist at the time we did KAFKA-1650 so this did >>not >> >> > occur to us. >> >> > - We think the RecordHandler is still a useful small addition for >>the >> >> > use-cases mentioned earlier in this thread. >> >> > >> >> > Thanks, >> >> > >> >> > Joel >> >> > >> >> > On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote: >> >> > > Guozhang, I agree with 1-3, I do think what I was proposing was >> >>simpler >> >> > but >> >> > > perhaps there re gaps in that? >> >> > > >> >> > > Hey Joel--Here was a sketch of what I was proposing. I do think >>this >> >> > get's >> >> > > rid of manual offset tracking, especially doing so across threads >> >>with >> >> > > dedicated commit threads, which I think is prety complex. >> >> > > >> >> > > while(true) { >> >> > > val recs = consumer.poll(Long.MaxValue); >> >> > > for (rec <- recs) >> >> > > producer.send(rec, logErrorCallback) >> >> > > if(System.currentTimeMillis - lastCommit > commitInterval) { >> >> > > producer.flush() >> >> > > consumer.commit() >> >> > > lastCommit = System.currentTimeMillis >> >> > > } >> >> > > } >> >> > > >> >> > > (See the previous email for details). I think the question is: is >> >>there >> >> > any >> >> > > reason--performance, correctness, etc--that this won't work? >> >>Basically >> >> I >> >> > > think you guys have thought about this more so I may be missing > >> > something. >> >> > > If so let's flag it while we still have leeway on the consumer. >> >> > > >> >> > > If we think that will work, well I do think it is conceptually a >>lot >> >> > > simpler than the current code, though I suppose one could >>disagree >> >>on >> >> > that. >> >> > > >> >> > > -Jay >> >> > > >> >> > > On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy <jjkosh...@gmail.com> >> >> wrote: >> >> > > >> >> > > > Hi Jay, >> >> > > > >> >> > > > > The data channels are actually a big part of the complexity >>of >> >>the >> >> > zero >> >> > > > > data loss design, though, right? Because then you need some >> >>reverse >> >> > > > channel >> >> > > > > to flo the acks back to the consumer based on where you are >> >>versus >> >> > just >> >> > > > > acking what you have read and written (as in the code >>snippet I >> >>put >> >> > up). >> >> > > > >> >> > > > I'm not sure if we are on the same page. Even if the data >>channel >> >>was >> >> > > > not there the current handling for zero data loss would remain >> >>very >> >> > > > similar - you would need to maintain lists of unacked source >> >>offsets. >> >> > > > I'm wondering if the KIP needs more detail on how it is >>currently >> >> > > > implemented; or are suggesting a different approach (in which >> >>case I >> >> > > > have not fully understood). I'm not sure what you mean by >>flowing >> >> acks >> >> > > > back to the consumer - the MM commits offsets after the >>producer >> >>ack >> >> > > > has been received. There is some additional complexity >>introduced >> >>in >> >> > > > reducing duplicates on a rebalance - this is actually optional >> >>(since >> >> > > > duplicates are currently a given). The reason that was done >> >>anyway is >> >> > > > that with the auto-commit turned off duplicates are almost >> >>guaranteed >> >> > > > on a rebalance. >> >> > > > >> >> > > > > I think the point that Neha and I were trying to make was >>that >> >>the >> >> > > > > motivation to embed stuff into MM kind of is related to how >> >> complex a >> >> > > > > simple "consume and produce" with good throughput will be. If >> >>it is >> >> > > > simple >> >> > > > > to write such a thing in a few lines, the pain of embedding a >> >>bunch >> >> > of >> >> > > > > stuff won't be worth it, if it has to be as complex as the >> >>current >> >> mm >> >> > > > then >> >> > > > > of course we will need all kinds of plug ins because no one >> >>will be >> >> > able >> >> > > > to >> >> > > > > write such a thing. I don't have a huge concern with a simple >> >> plug-in >> >> > > > but I >> >> > > > > think if it turns into something more complex with filtering >>and >> >> > > > > aggregation or whatever we really need to stop and think a >>bit >> >> about >> >> > the > >> > > > > design. >> >> > > > >> >> > > > I agree - I don't think there is a use-case for any comple >> >>plug-in. >> >> > > > It is pretty much what Becket has described currently for the >> >>message >> >> > > > handler - i.e., take an incoming record and return a list of >> >>outgoing >> >> > > > records (which could be empty if you filter). >> >> > > > >> >> > > > So here is my take on the MM: >> >> > > > - Bare bones: simple consumer - producer pairs (0.7 style). >>This >> >>is >> >> > > > ideal, but does not handle no data los >> >> > > > - Above plus support no data loss. This actually adds quite a >>bit >> >>of >> >> > > > complexity. >> >> > > > - Above plus the message handler. This is a trivial addition I >> >>think >> >> > > > that makes the MM usable in a few other mirroring-like >> >> applications. >> >> > > > >> >> > > > Joel >> >> > > > >> >> > > > > On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy >> >><jjkosh...@gmail.com> >> >> > > > wrote: >> >> > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede >>wrote: >> >> > > > > > > I think all of us agree that we want to design >>MirrorMaker >> >>for >> >> 0 >> >> > data >> >> > > > > > loss. >> >> > > > > > > With the absence of the data channel, 0 data loss will be >> >>much >> >> > > > simpler to >> >> > > > > > > implement. >> >> > > > > > >> >> > > > > > The data channel is irrelevant to the implementation of >>zero >> >>data >> >> > > > > > loss. The complexity in the implementation of no data loss >> >>that >> >> you >> >> > > > > > are seeing in mirror-maker affects all consume-then-produce >> >> > patterns >> >> > > > > > whether or not there is a data channel. You still need to >> >> > maintain a >> >> > > > > > list of unacked offsets. What I meant earlier is that we >>can >> >> > > > > > brainstorm completely different approaches to supporting no >> >>data >> >> > loss, >> >> > > > > > but the current implementation is the only solution we are >> >>aware >> >> > of. >> >> > > > > > >> >> > > > > > > >> >> > > > > > > My arguments for adding a message handler are that: >> >> > > > > > > > 1. It is more efficient to do something in common for >>all >> >>the >> >> > > > clients >> >> > > > > > in >> >> > > > > > > > pipeline than letting each client do the same thing for >> >>many >> >> > > > times. And >> >> > > > > > > > there are concrete use cases for the message handler >> >>already. >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > > What are the concrete use cases? >> >> > > > > > >> >> > > > > > I think Becket already described a couple of use cases >> >>earlier in >> >> > the >> >> > > > > > thread. >> >> > > > > > >> >> > > > > > <quote> >> >> > > > > > >> >> > > > > > 1. Format conversion. We have a use case where clients of >> >>source >> >> > > > > > cluster >> >> > > > > > use an internal schema and clients of target cluster use a >> >> > different >> >> > > > > > public schema. >> >> > > > > > 2. Message filtering: For the messages published to source >> >> cluster, >> >> > > > > > there >> >> > > > > > are some messages private to source cluster clients and >>should >> >> not >> >> > > > > > exposed >> >> > > > > > to target cluster clients. It would be difficult to publish >> >>those >> >> > > > > > messages >> >> > > > > > into different partitions because they need to be ordered. >> >> > > > > > I agree that we can always filter/convert messages after >>they >> >>are >> >> > > > > > copied >> >> > > > > > to the target cluster, but that costs network bandwidth >> >> > unnecessarily, >> >> > > > > > especially if that is a cross colo mirror. With the >>handler, >> >>we >> >> can >> >> > > > > > co-locate the mirror maker with source cluster and save >>that >> >> cost. >> >> > > > > > Also, >> >> > > > > > imagine there are many downstream consumers consuming from >>the >> >> > target >> >> > > > > > cluster, filtering/reformatting the messages before the >> >>messages >> >> > reach >> >> > > > > > the >> >> > > > > > target cluster is much more efficient than having each of >>the >> >> > > > > > consumers do >> >> > > > > > this individually on their own. >> >> > > > > > >> >> > > > > > </quote> >> >> > > > > > >> >> > > > > > > >> >> > > > > > > Also the KIP still refers to the data channel in a few >> >>places >> >> > > > (Motivation >> >> > > > > > > and "On consumer rebalance" sections). Can you update the >> >>wiki >> >> > so it >> >> > > > is >> >> > > > > > > easier to review the new design, especially the data loss >> >>part. >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > On Tue, Feb 10, 2015 at 10:36 AM, Joel Koshy < >> >> > jjkosh...@gmail.com> >> >> > > > > > wrote: >> >> > > > > > > >> >> > > > > > > > I think the message handler adds little to no >>complexity >> >>to >> >> the >> >> > > > mirror >> >> > > > > > > > maker. Jay/Neha, the MM became scary due to the >> >> rearchitecture >> >> > we >> >> > > > did >> >> > > > > > > > for 0.8 due to performance issues compared with 0.7 - >>we >> >> should >> >> > > > remove >> >> > > > > > > > the data channel if it can match the current >>throughput. I >> >> > agree >> >> > > > it is >> >> > > > > > > > worth prototyping and testing that so the MM >>architecture >> >>is >> >> > > > > > > > simplified. >> >> > > > > > > > >> >> > > > > > > > The MM became a little scarier in KAFKA-1650 in order >>to >> >> > support no >> >> > > > > > > > data loss. I think the implementation for no data loss >> >>will >> >> > remain >> >> > > > > > > > about the same even in the new model (even without the >> >>data >> >> > > > channel) - >> >> > > > > > > > we can probably brainstorm more if there is a >> >>better/simpler >> >> > way >> >> > > > to do >> >> > > > > > > > it (maybe there is in the absence of the data channel) >> >>but at >> >> > the >> >> > > > time >> >> > > > > > > > it was the best we (i.e., Becket, myself, Jun and >>Guozhang >> >> who >> >> > > > > > > > participated on the review) could come up with. >> >> > > > > > > > >> >> > > > > > > > So I'm definitely +1 on whatever it takes to support no >> >>data >> >> > loss. >> >> > > > I >> >> > > > > > > > think most people would want that out of the box. >> >> > > > > > > > >> >> > > > > > > > As for the message handler, as Becket wrote and I agree >> >>with, >> >> > it is >> >> > > > > > > > really a trivial addition that would benefit (perhaps >>not >> >> most, >> >> > > > but at >> >> > > > > > > > least some). So I'm personally +1 on that as well. That >> >>said, >> >> > I'm >> >> > > > also >> >> > > > > > > > okay with it not being there. I think the MM is fairly >> >> > stand-alone >> >> > > > and >> >> > > > > > > > simple enough that it is entirely reasonable and >> >>absolutely >> >> > > > feasible >> >> > > > > > > > for companies to fork/re-implement the mirror maker for >> >>their >> >> > own >> >> > > > > > > > needs. >> >> > > > > > > > >> >> > > > > > > > So in summary, I'm +1 on the KIP. >> >> > > > > > > > >> >> > > > > > > > Thanks, >> >> > > > > > > > >> >> > > > > > > > Joel >> >> > > > > > > > >> >> > > > > > > > On Mon, Feb 09, 2015 at 09:19:57PM +0000, Jiangjie Qin >> >>wrote: >> >> > > > > > > > > I just updated the KIP page and incorporated Jay and >> >>Neha’s >> >> > > > > > suggestion. >> >> > > > > > > > As >> >> > > > > > > > > a brief summary of where we are: >> >> > > > > > > > > >> >> > > > > > > > > Consensus reached: >> >> > > > > > > > > Have N independent mirror maker threads each has >>their >> >>own >> >> > > > consumers >> >> > > > > > but >> >> > > > > > > > > share a producer. The mirror maker threads will be >> >> > responsible >> >> > > > for >> >> > > > > > > > > decompression, compression and offset commit. No data >> >> > channel and >> >> > > > > > > > separate >> >> > > > > > > > > offset commit thread is needed. Consumer rebalance >> >>callback >> >> > will >> >> > > > be >> >> > > > > > used >> >> > > > > > > > > to avoid duplicates on rebalance. >> >> > > > > > > > > >> >> > > > > > > > > Still under discussion: >> >> > > > > > > > > Whether message handler is needed. >> >> > > > > > > > > >> >> > > > > > > > > My arguments for adding a message handler are that: >> >> > > > > > > > > 1. It is more efficient to do something in common for >> >>all >> >> the >> >> > > > > > clients in >> >> > > > > > > > > pipeline than letting each client do the same thing >>for >> >> many >> >> > > > times. >> >> > > > > > And >> >> > > > > > > > > there are concrete use cases for the message handler >> >> already. >> >> > > > > > > > > 2. It is not a big complicated add-on to mirror >>maker. >> >> > > > > > > > > 3. Without a message handler, for customers needs it, >> >>they >> >> > have >> >> > > > to >> >> > > > > > > > > re-implement all the logics of mirror maker by >> >>themselves >> >> > just in >> >> > > > > > order >> >> > > > > > > > to >> >> > > > > > > > > add this handling in pipeline. >> >> > > > > > > > > >> >> > > > > > > > > Any thoughts? >> >> > > > > > > > > >> >> > > > > > > > > Thanks. >> >> > > > > > > > > >> >> > > > > > > > > ―Jiangjie (Becket) Qin >> >> > > > > > > > > >> >> > > > > > > > > On 2/8/15, 6:35 PM, "Jiangjie Qin" >><j...@linkedin.com> >> >> > wrote: >> >> > > > > > > > > >> >> > > > > > > > > >Hi Jay, thanks a lot for the comments. >> >> > > > > > > > > >I think this solution is better. We probably don’t >>need >> >> data >> >> > > > channel >> >> > > > > > > > > >anymore. It can be replaced with a list of producer >>if >> >>we >> >> > need >> >> > > > more >> >> > > > > > > > sender >> >> > > > > > > > > >thread. >> >> > > > > > > > > >I’ll update the KIP page. >> >> > > > > > > > > > >> >> > > > > > > > > >The reasoning about message handler is mainly for >> >> efficiency >> >> > > > > > purpose. >> >> > > > > > > > I’m >> >> > > > > > > > > >thinking that if something can be done in pipeline >>for >> >>all >> >> > the >> >> > > > > > clients >> >> > > > > > > > > >such as filtering/reformatting, it is probably >>better >> >>to >> >> do >> >> > it >> >> > > > in >> >> > > > > > the >> >> > > > > > > > > >pipeline than asking 100 clients do the same thing >>for >> >>100 >> >> > > > times. >> >> > > > > > > > > > >> >> > > > > > > > > >―Jiangjie (Becket) Qin >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >On 2/8/15, 4:59 PM, "Jay Kreps" >><jay.kr...@gmail.com> >> >> > wrote: >> >> > > > > > > > > > >> >> > > > > > > > > >>Yeah, I second Neha's comments. The current mm code >> >>has >> >> > taken >> >> > > > > > something >> >> > > > > > > > > >>pretty simple and made it pretty scary with >>callbacks >> >>and >> >> > > > > > wait/notify >> >> > > > > > > > > >>stuff. Do we believe this works? I can't tell by >> >>looking >> >> > at it >> >> > > > > > which is >> >> > > > > > > > > >>kind of bad for something important like this. I >>don't >> >> mean >> >> > > > this as >> >> > > > > > > > > >>criticism, I know the history: we added in memory >> >>queues >> >> to >> >> > > > help >> >> > > > > > with >> >> > > > > > > > > >>other >> >> > > > > > > > > >>performance problems without thinking about >> >>correctness, >> >> > then >> >> > > > we >> >> > > > > > added >> >> > > > > > > > > >>stuff to work around the in-memory queues not lose >> >>data, >> >> > and >> >> > > > so on. >> >> > > > > > > > > >> >> >> > > > > > > > > >>Can we instead do the opposite exercise and start >>with >> >> the >> >> > > > basics >> >> > > > > > of >> >> > > > > > > > what >> >> > > > > > > > > >>mm should do and think about what deficiencies >> >>prevents >> >> > this >> >> > > > > > approach >> >> > > > > > > > > >>from >> >> > > > > > > > > >>working? Then let's make sure the currently >>in-flight >> >> work >> >> > will >> >> > > > > > remove >> >> > > > > > > > > >>these deficiencies. After all mm is kind of the >> >> > prototypical >> >> > > > kafka >> >> > > > > > use >> >> > > > > > > > > >>case >> >> > > > > > > > > >>so if we can't make our clients to this probably no >> >>one >> >> > else >> >> > > > can. >> >> > > > > > > > > >> >> >> > > > > > > > > >>I think mm should just be N independent threads >>each >> >>of >> >> > which >> >> > > > has >> >> > > > > > their >> >> > > > > > > > > >>own >> >> > > > > > > > > >>consumer but share a producer and each of which >>looks >> >> like >> >> > > > this: >> >> > > > > > > > > >> >> >> > > > > > > > > >>while(true) { >> >> > > > > > > > > >> val recs = consumer.poll(Long.MaxValue); >> >> > > > > > > > > >> for (rec <- recs) >> >> > > > > > > > > >> producer.send(rec, logErrorCallback) >> >> > > > > > > > > >> if(System.currentTimeMillis - lastCommit > >> >> > commitInterval) >> >> > > > { >> >> > > > > > > > > >> producer.flush() >> >> > > > > > > > > >> consumer.commit() >> >> > > > > > > > > >> lastCommit = System.currentTimeMillis >> >> > > > > > > > > >> } >> >> > > > > > > > > >>} >> >> > > > > > > > > >> >> >> > > > > > > > > >>This will depend on setting the retry count in the >> >> > producer to >> >> > > > > > > > something >> >> > > > > > > > > >>high with a largish backoff so that a failed send >> >>attempt >> >> > > > doesn't >> >> > > > > > drop >> >> > > > > > > > > >>data. >> >> > > > > > > > > >> >> >> > > > > > > > > >>We will need to use the callback to force a flush >>and >> >> > offset >> >> > > > > > commit on >> >> > > > > > > > > >>rebalance. >> >> > > > > > > > > >> >> >> > > > > > > > > >>This approach may have a few more TCP connections >>due >> >>to >> >> > using >> >> > > > > > multiple >> >> > > > > > > > > >>consumers but I think it is a lot easier to reason >> >>about >> >> > and >> >> > > > the >> >> > > > > > total >> >> > > > > > > > > >>number of mm instances is always going to be small. >> >> > > > > > > > > >> >> >> > > > > > > > > >>Let's talk about where this simple approach falls >> >>short, >> >> I >> >> > > > think >> >> > > > > > that >> >> > > > > > > > > >>will >> >> > > > > > > > > >>help us understand your motivations for additional >> >> > elements. >> >> > > > > > > > > >> >> >> > > > > > > > > >>Another advantage of this is that it is so simple I >> >>don't >> >> > > > think we >> >> > > > > > > > really >> >> > > > > > > > > >>even need to both making mm extensible because >>writing >> >> > your own >> >> > > > > > code >> >> > > > > > > > that >> >> > > > > > > > > >>does custom processing or transformation is just >>ten >> >> lines >> >> > and >> >> > > > no >> >> > > > > > plug >> >> > > > > > > > in >> >> > > > > > > > > >>system is going to make it simpler. >> >> > > > > > > > > >> >> >> > > > > > > > > >>-Jay >> >> > > > > > > > > >> >> >> > > > > > > > > >> >> >> > > > > > > > > >>On Sun, Feb 8, 2015 at 2:40 PM, Neha Narkhede < >> >> > > > n...@confluent.io> >> >> > > > > > > > wrote: >> >> > > > > > > > > >> >> >> > > > > > > > > >>> Few comments - >> >> > > > > > > > > >>> >> >> > > > > > > > > >>> 1. Why do we need the message handler? Do you >>have >> >> > concrete >> >> > > > use >> >> > > > > > cases >> >> > > > > > > > > >>>in >> >> > > > > > > > > >>> mind? If not, we should consider adding it in the >> >> future >> >> > > > when/if >> >> > > > > > we >> >> > > > > > > > do >> >> > > > > > > > > >>>have >> >> > > > > > > > > >>> use cases for it. The purpose of the mirror maker >> >>is a >> >> > simple >> >> > > > > > tool >> >> > > > > > > > for >> >> > > > > > > > > >>> setting up Kafka cluster replicas. I don't see >>why >> >>we >> >> > need to >> >> > > > > > > > include a >> >> > > > > > > > > >>> message handler for doing stream transformations >>or >> >> > > > filtering. >> >> > > > > > You >> >> > > > > > > > can >> >> > > > > > > > > >>> always write a simple process for doing that once >> >>the >> >> > data is >> >> > > > > > copied >> >> > > > > > > > as >> >> > > > > > > > > >>>is >> >> > > > > > > > > >>> in the target cluster >> >> > > > > > > > > >>> 2. Why keep both designs? We should prefer the >> >>simpler >> >> > design >> >> > > > > > unless >> >> > > > > > > > it >> >> > > > > > > > > >>>is >> >> > > > > > > > > >>> not feasible due to the performance issue that we >> >> > previously >> >> > > > > > had. Did >> >> > > > > > > > > >>>you >> >> > > > > > > > > >>> get a chance to run some tests to see if that is >> >>really >> >> > > > still a >> >> > > > > > > > problem >> >> > > > > > > > > >>>or >> >> > > > > > > > > >>> not? It will be easier to think about the design >>and >> >> also >> >> > > > make >> >> > > > > > the >> >> > > > > > > > KIP >> >> > > > > > > > > >>> complete if we make a call on the design first. >> >> > > > > > > > > >>> 3. Can you explain the need for keeping a list of >> >> unacked >> >> > > > > > offsets per >> >> > > > > > > > > >>> partition? Consider adding a section on retries >>and >> >>how >> >> > you >> >> > > > plan >> >> > > > > > to >> >> > > > > > > > > >>>handle >> >> > > > > > > > > >>> the case when the producer runs out of all >>retries. >> >> > > > > > > > > >>> >> >> > > > > > > > > >>> Thanks, >> >> > > > > > > > > >>> Neha >> >> > > > > > > > > >>> >> >> > > > > > > > > >>> On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin >> >> > > > > > > > > >>><j...@linkedin.com.invalid> >> >> > > > > > > > > >>> wrote: >> >> > > > > > > > > >>> >> >> > > > > > > > > >>> > Hi Neha, >> >> > > > > > > > > >>> > >> >> > > > > > > > > >>> > Yes, I’ve updated the KIP so the entire KIP is >> >>based >> >> > on new >> >> > > > > > > > consumer >> >> > > > > > > > > >>>now. >> >> > > > > > > > > >>> > I’ve put both designs with and without data >> >>channel >> >> in >> >> > the >> >> > > > KIP >> >> > > > > > as I >> >> > > > > > > > > >>>still >> >> > > > > > > > > >>> > feel we might need the data channel to provide >> >>more >> >> > > > > > flexibility, >> >> > > > > > > > > >>> > especially after message handler is introduced. >> >>I’ve >> >> > put my >> >> > > > > > > > thinking >> >> > > > > > > > > >>>of >> >> > > > > > > > > >>> > the pros and cons of the two designs in the >>KIP as >> >> > well. >> >> > > > It’ll >> >> > > > > > be >> >> > > > > > > > > >>>great >> >> > > > > > > > > >>> if >> >> > > > > > > > > >>> > you can give a review and comment. >> >> > > > > > > > > >>> > >> >> > > > > > > > > >>> > Thanks. >> >> > > > > > > > > >>> > >> >> > > > > > > > > >>> > Jiangjie (Becket) Qin >> >> > > > > > > > > >>> > >> >> > > > > > > > > >>> > On 2/6/15, 7:30 PM, "Neha Narkhede" < >> >> n...@confluent.io >> >> > > >> >> > > > wrote: >> >> > > > > > > > > >>> > >> >> > > > > > > > > >>> > >Hey Becket, >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > >What are the next steps on this KIP. As per >>your >> >> > comment >> >> > > > > > earlier >> >> > > > > > > > on >> >> > > > > > > > > >>>the >> >> > > > > > > > > >>> > >thread - >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > >I do agree it makes more sense >> >> > > > > > > > > >>> > >> to avoid duplicate effort and plan based on >>new >> >> > > > consumer. >> >> > > > > > I’ll >> >> > > > > > > > > >>>modify >> >> > > > > > > > > >>> > >>the >> >> > > > > > > > > >>> > >> KIP. >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > >Did you get a chance to think about the >> >>simplified >> >> > design >> >> > > > > > that we >> >> > > > > > > > > >>> proposed >> >> > > > > > > > > >>> > >earlier? Do you plan to update the KIP with >>that >> >> > proposal? >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > >Thanks, >> >> > > > > > > > > >>> > >Neha >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin >> >> > > > > > > > > >>><j...@linkedin.com.invalid >> >> > > > > > > > > >>> > >> >> > > > > > > > > >>> > >wrote: >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > >> In mirror maker we do not do >>de-serialization >> >>on >> >> the >> >> > > > > > messages. >> >> > > > > > > > > >>>Mirror >> >> > > > > > > > > >>> > >> maker use source TopicPartition hash to >>chose a >> >> > > > producer to >> >> > > > > > send >> >> > > > > > > > > >>> > >>messages >> >> > > > > > > > > >>> > >> from the same source partition. The >>partition >> >> those >> >> > > > > > messages end >> >> > > > > > > > > >>>up >> >> > > > > > > > > >>> with >> >> > > > > > > > > >>> > >> are decided by Partitioner class in >> >>KafkaProducer >> >> > > > (assuming >> >> > > > > > you >> >> > > > > > > > > >>>are >> >> > > > > > > > > >>> > >>using >> >> > > > > > > > > >>> > >> the new producer), which uses hash code of >> >> bytes[]. >> >> > > > > > > > > >>> > >> >> >> > > > > > > > > >>> > >> If deserialization is needed, it has to be >> >>done in >> >> > > > message >> >> > > > > > > > > >>>handler. >> >> > > > > > > > > >>> > >> >> >> > > > > > > > > >>> > >> Thanks. >> >> > > > > > > > > >>> > >> >> >> > > > > > > > > >>> > >> Jiangjie (Becket) Qin >> >> > > > > > > > > >>> > >> >> >> > > > > > > > > >>> > >> On 2/4/15, 11:33 AM, "Bhavesh Mistry" < >> >> > > > > > > > mistry.p.bhav...@gmail.com> >> >> > > > > > > > > >>> > >>wrote: >> >> > > > > > > > > >>> > >> >> >> > > > > > > > > >>> > >> >Hi Jiangjie, >> >> > > > > > > > > >>> > >> > >> >> > > > > > > > > >>> > >> >Thanks for entertaining my question so far. >> >>Last >> >> > > > > > question, I >> >> > > > > > > > > >>>have is >> >> > > > > > > > > >>> > >> >about >> >> > > > > > > > > >>> > >> >serialization of message key. If the key >> >> > > > de-serialization >> >> > > > > > > > > >>>(Class) is >> >> > > > > > > > > >>> > >>not >> >> > > > > > > > > >>> > >> >present at the MM instance, then does it >>use >> >>raw >> >> > byte >> >> > > > > > hashcode >> >> > > > > > > > to >> >> > > > > > > > > >>> > >> >determine >> >> > > > > > > > > >>> > >> >the partition ? How are you going to >>address >> >>the >> >> > > > situation >> >> > > > > > > > where >> >> > > > > > > > > >>>key >> >> > > > > > > > > >>> > >> >needs >> >> > > > > > > > > >>> > >> >to be de-serialization and get actual >>hashcode >> >> > needs >> >> > > > to be >> >> > > > > > > > > >>>computed >> >> > > > > > > > > >>> ?. >> >> > > > > > > > > >>> > >> > >> >> > > > > > > > > >>> > >> > >> >> > > > > > > > > >>> > >> >Thanks, >> >> > > > > > > > > >>> > >> > >> >> > > > > > > > > >>> > >> >Bhavesh >> >> > > > > > > > > >>> > >> > >> >> > > > > > > > > >>> > >> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie >>Qin >> >> > > > > > > > > >>> > >><j...@linkedin.com.invalid> >> >> > > > > > > > > >>> > >> >wrote: >> >> > > > > > > > > >>> > >> > >> >> > > > > > > > > >>> > >> >> Hi Bhavesh, >> >> > > > > > > > > >>> > >> >> >> >> > > > > > > > > >>> > >> >> Please see inline comments. >> >> > > > > > > > > >>> > >> >> >> >> > > > > > > > > >>> > >> >> Jiangjie (Becket) Qin >> >> > > > > > > > > >>> > >> >> >> >> > > > > > > > > >>> > >> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" >> >> > > > > > > > > >>><mistry.p.bhav...@gmail.com> >> >> > > > > > > > > >>> > >> >>wrote: >> >> > > > > > > > > >>> > >> >> >> >> > > > > > > > > >>> > >> >> >Hi Jiangjie, >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> >Thanks for the input. >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> >a) Is MM will producer ack will be >>attach >> >>to >> >> > > > Producer >> >> > > > > > > > > >>>Instance or >> >> > > > > > > > > >>> > >>per >> >> > > > > > > > > >>> > >> >> >topic. Use case is that one instance >>of MM >> >> > > > > > > > > >>> > >> >> >needs to handle both strong ack and also >> >>ack=0 >> >> > for >> >> > > > some >> >> > > > > > > > topic. >> >> > > > > > > > > >>> Or >> >> > > > > > > > > >>> > >>it >> >> > > > > > > > > >>> > >> >> >would >> >> > > > > > > > > >>> > >> >> >be better to set-up another instance of >>MM. >> >> > > > > > > > > >>> > >> >> The acks setting is producer level >>setting >> >> > instead of >> >> > > > > > topic >> >> > > > > > > > > >>>level >> >> > > > > > > > > >>> > >> >>setting. >> >> > > > > > > > > >>> > >> >> In this case you probably need to set up >> >> another >> >> > > > > > instance. >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> >b) Regarding TCP connections, Why does >> >> #producer >> >> > > > > > instance >> >> > > > > > > > > >>>attach >> >> > > > > > > > > >>> to >> >> > > > > > > > > >>> > >>TCP >> >> > > > > > > > > >>> > >> >> >connection. Is it possible to use >>Broker >> >> > > > Connection TCP >> >> > > > > > > > Pool, >> >> > > > > > > > > >>> > >>producer >> >> > > > > > > > > >>> > >> >> >will just checkout TCP connection to >> >>Broker. >> >> > So, >> >> > > > # of >> >> > > > > > > > > >>>Producer >> >> > > > > > > > > >>> > >> >>Instance >> >> > > > > > > > > >>> > >> >> >does not correlation to Brokers >>Connection. >> >> Is >> >> > this >> >> > > > > > > > possible >> >> > > > > > > > > >>>? >> >> > > > > > > > > >>> > >> >> In new producer, each producer maintains >>a >> >> > > > connection to >> >> > > > > > each >> >> > > > > > > > > >>> broker >> >> > > > > > > > > >>> > >> >> within the producer instance. Making >> >>producer >> >> > > > instances >> >> > > > > > to >> >> > > > > > > > > >>>share >> >> > > > > > > > > >>> the >> >> > > > > > > > > >>> > >>TCP >> >> > > > > > > > > >>> > >> >> connections is a very big change to the >> >>current >> >> > > > design, >> >> > > > > > so I >> >> > > > > > > > > >>> suppose >> >> > > > > > > > > >>> > >>we >> >> > > > > > > > > >>> > >> >> won’t be able to do that. >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> >Thanks, >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> >Bhavesh >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> >On Thu, Jan 29, 2015 at 11:50 AM, >>Jiangjie >> >>Qin >> >> > > > > > > > > >>> > >> >><j...@linkedin.com.invalid >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> >wrote: >> >> > > > > > > > > >>> > >> >> > >> >> > > > > > > > > >>> > >> >> >> Hi Bhavesh, >> >> > > > > > > > > >>> > >> >> >> >> >> > > > > > > > > >>> > >> >> >> I think it is the right discussion to >> >>have >> >> > when >> >> > > > we are >> >> > > > > > > > > >>>talking >> >> > > > > > > > > >>> > >>about >> >> > > > > > > > > >>> > >> >>the >> >> > > > > > > > > >>> > >> >> >> new new design for MM. >> >> > > > > > > > > >>> > >> >> >> Please see the inline comments. >> >> > > > > > > > > >>> > >> >> >> >> >> > > > > > > > > >>> > >> >> >> Jiangjie (Becket) Qin >> >> > > > > > > > > >>> > >> >> >> >> >> > > > > > > > > >>> > >> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" >> >> > > > > > > > > >>> > >><mistry.p.bhav...@gmail.com> >> >> > > > > > > > > >>> > >> >> >>wrote: >> >> > > > > > > > > >>> > >> >> >> >> >> > > > > > > > > >>> > >> >> >> >Hi Jiangjie, >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >I just wanted to let you know about >>our >> >>use >> >> > case >> >> > > > and >> >> > > > > > > > stress >> >> > > > > > > > > >>>the >> >> > > > > > > > > >>> > >> >>point >> >> > > > > > > > > >>> > >> >> >>that >> >> > > > > > > > > >>> > >> >> >> >local data center broker cluster have >> >>fewer >> >> > > > > > partitions >> >> > > > > > > > than >> >> > > > > > > > > >>>the >> >> > > > > > > > > >>> > >> >> >> >destination >> >> > > > > > > > > >>> > >> >> >> >offline broker cluster. Just because >>we >> >>do >> >> > the >> >> > > > batch >> >> > > > > > pull >> >> > > > > > > > > >>>from >> >> > > > > > > > > >>> > >>CAMUS >> >> > > > > > > > > >>> > >> >> >>and >> >> > > > > > > > > >>> > >> >> >> >in >> >> > > > > > > > > >>> > >> >> >> >order to drain data faster than the >> >> injection >> >> > > > rate >> >> > > > > > (from >> >> > > > > > > > > >>>four >> >> > > > > > > > > >>> DCs >> >> > > > > > > > > >>> > >> >>for >> >> > > > > > > > > >>> > >> >> >>same >> >> > > > > > > > > >>> > >> >> >> >topic). >> >> > > > > > > > > >>> > >> >> >> Keeping the same partition number in >> >>source >> >> > and >> >> > > > target >> >> > > > > > > > > >>>cluster >> >> > > > > > > > > >>> > >>will >> >> > > > > > > > > >>> > >> >>be >> >> > > > > > > > > >>> > >> >> >>an >> >> > > > > > > > > >>> > >> >> >> option but will not be enforced by >> >>default. >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >We are facing following issues >>(probably >> >> due >> >> > to >> >> > > > > > > > > >>>configuration): >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >1) We occasionally loose data >>due >> >>to >> >> > message >> >> > > > > > batch >> >> > > > > > > > > >>>size is >> >> > > > > > > > > >>> > >>too >> >> > > > > > > > > >>> > >> >> >>large >> >> > > > > > > > > >>> > >> >> >> >(2MB) on target data (we are using >>old >> >> > producer >> >> > > > but I >> >> > > > > > > > think >> >> > > > > > > > > >>>new >> >> > > > > > > > > >>> > >> >> >>producer >> >> > > > > > > > > >>> > >> >> >> >will solve this problem to some >>extend). >> >> > > > > > > > > >>> > >> >> >> We do see this issue in LinkedIn as >>well. >> >> New >> >> > > > producer >> >> > > > > > > > also >> >> > > > > > > > > >>> might >> >> > > > > > > > > >>> > >> >>have >> >> > > > > > > > > >>> > >> >> >> this issue. There are some proposal of >> >> > solutions, >> >> > > > but >> >> > > > > > no >> >> > > > > > > > > >>>real >> >> > > > > > > > > >>> work >> >> > > > > > > > > >>> > >> >> >>started >> >> > > > > > > > > >>> > >> >> >> yet. For now, as a workaround, >>setting a >> >> more >> >> > > > > > aggressive >> >> > > > > > > > > >>>batch >> >> > > > > > > > > >>> > >>size >> >> > > > > > > > > >>> > >> >>on >> >> > > > > > > > > >>> > >> >> >> producer side should work. >> >> > > > > > > > > >>> > >> >> >> >2) Since only one instance is >>set >> >>to >> >> MM >> >> > > > data, >> >> > > > > > we >> >> > > > > > > > are >> >> > > > > > > > > >>>not >> >> > > > > > > > > >>> > >>able >> >> > > > > > > > > >>> > >> >>to >> >> > > > > > > > > >>> > >> >> >> >set-up ack per topic instead ack is >> >> attached >> >> > to >> >> > > > > > producer >> >> > > > > > > > > >>> > >>instance. >> >> > > > > > > > > >>> > >> >> >> I don’t quite get the question here. >> >> > > > > > > > > >>> > >> >> >> >3) How are you going to address >>two >> >> > phase >> >> > > > commit >> >> > > > > > > > > >>>problem >> >> > > > > > > > > >>> if >> >> > > > > > > > > >>> > >> >>ack is >> >> > > > > > > > > >>> > >> >> >> >set >> >> > > > > > > > > >>> > >> >> >> >to strongest, but auto commit is on >>for >> >> > consumer >> >> > > > > > (meaning >> >> > > > > > > > > >>> > >>producer >> >> > > > > > > > > >>> > >> >>does >> >> > > > > > > > > >>> > >> >> >> >not >> >> > > > > > > > > >>> > >> >> >> >get ack, but consumer auto committed >> >> offset >> >> > that >> >> > > > > > > > message). >> >> > > > > > > > > >>> Is >> >> > > > > > > > > >>> > >> >>there >> >> > > > > > > > > >>> > >> >> >> >transactional (Kafka transaction is >>in >> >> > process) >> >> > > > > > based ack >> >> > > > > > > > > >>>and >> >> > > > > > > > > >>> > >>commit >> >> > > > > > > > > >>> > >> >> >> >offset >> >> > > > > > > > > >>> > >> >> >> >? >> >> > > > > > > > > >>> > >> >> >> Auto offset commit should be turned >>off >> >>in >> >> > this >> >> > > > case. >> >> > > > > > The >> >> > > > > > > > > >>>offset >> >> > > > > > > > > >>> > >>will >> >> > > > > > > > > >>> > >> >> >>only >> >> > > > > > > > > >>> > >> >> >> be committed once by the offset commit >> >> > thread. So >> >> > > > > > there is >> >> > > > > > > > > >>>no >> >> > > > > > > > > >>> two >> >> > > > > > > > > >>> > >> >>phase >> >> > > > > > > > > >>> > >> >> >> commit. >> >> > > > > > > > > >>> > >> >> >> >4) How are you planning to avoid >> >> > duplicated >> >> > > > > > message? >> >> > > > > > > > > >>>( Is >> >> > > > > > > > > >>> > >> >> >> >brokergoing >> >> > > > > > > > > >>> > >> >> >> >have moving window of message >>collected >> >>and >> >> > > > de-dupe >> >> > > > > > ?) >> >> > > > > > > > > >>> > >>Possibly, we >> >> > > > > > > > > >>> > >> >> >>get >> >> > > > > > > > > >>> > >> >> >> >this from retry set to 5…? >> >> > > > > > > > > >>> > >> >> >> We are not trying to completely avoid >> >> > duplicates. >> >> > > > The >> >> > > > > > > > > >>>duplicates >> >> > > > > > > > > >>> > >>will >> >> > > > > > > > > >>> > >> >> >> still be there if: >> >> > > > > > > > > >>> > >> >> >> 1. Producer retries on failure. >> >> > > > > > > > > >>> > >> >> >> 2. Mirror maker is hard killed. >> >> > > > > > > > > >>> > >> >> >> Currently, dedup is expected to be >>done >> >>by >> >> > user if >> >> > > > > > > > > >>>necessary. >> >> > > > > > > > > >>> > >> >> >> >5) Last, is there any warning or >> >>any >> >> > thing >> >> > > > you >> >> > > > > > can >> >> > > > > > > > > >>>provide >> >> > > > > > > > > >>> > >> >>insight >> >> > > > > > > > > >>> > >> >> >> >from MM component about data >>injection >> >>rate >> >> > into >> >> > > > > > > > > >>>destination >> >> > > > > > > > > >>> > >> >> >>partitions is >> >> > > > > > > > > >>> > >> >> >> >NOT evenly distributed regardless of >> >> keyed >> >> > or >> >> > > > > > non-keyed >> >> > > > > > > > > >>> message >> >> > > > > > > > > >>> > >> >> >>(Hence >> >> > > > > > > > > >>> > >> >> >> >there is ripple effect such as data >>not >> >> > arriving >> >> > > > > > late, or >> >> > > > > > > > > >>>data >> >> > > > > > > > > >>> is >> >> > > > > > > > > >>> > >> >> >>arriving >> >> > > > > > > > > >>> > >> >> >> >out of order in intern of time stamp >> >>and >> >> > early >> >> > > > some >> >> > > > > > > > time, >> >> > > > > > > > > >>>and >> >> > > > > > > > > >>> > >> >>CAMUS >> >> > > > > > > > > >>> > >> >> >> >creates huge number of file count on >> >>HDFS >> >> > due to >> >> > > > > > uneven >> >> > > > > > > > > >>> injection >> >> > > > > > > > > >>> > >> >>rate >> >> > > > > > > > > >>> > >> >> >>. >> >> > > > > > > > > >>> > >> >> >> >Camus Job is configured to run >>every 3 >> >> > minutes.) >> >> > > > > > > > > >>> > >> >> >> I think uneven data distribution is >> >> typically >> >> > > > caused >> >> > > > > > by >> >> > > > > > > > > >>>server >> >> > > > > > > > > >>> > >>side >> >> > > > > > > > > >>> > >> >> >> unbalance, instead of something mirror >> >>maker >> >> > could >> >> > > > > > > > control. >> >> > > > > > > > > >>>In >> >> > > > > > > > > >>> new >> >> > > > > > > > > >>> > >> >> >>mirror >> >> > > > > > > > > >>> > >> >> >> maker, however, there is a >>customizable >> >> > message >> >> > > > > > handler, >> >> > > > > > > > > >>>that >> >> > > > > > > > > >>> > >>might >> >> > > > > > > > > >>> > >> >>be >> >> > > > > > > > > >>> > >> >> >> able to help a little bit. In message >> >> handler, >> >> > > > you can >> >> > > > > > > > > >>> explicitly >> >> > > > > > > > > >>> > >> >>set a >> >> > > > > > > > > >>> > >> >> >> partition that you want to produce the >> >> message >> >> > > > to. So >> >> > > > > > if >> >> > > > > > > > you >> >> > > > > > > > > >>> know >> >> > > > > > > > > >>> > >>the >> >> > > > > > > > > >>> > >> >> >> uneven data distribution in target >> >>cluster, >> >> > you >> >> > > > may >> >> > > > > > offset >> >> > > > > > > > > >>>it >> >> > > > > > > > > >>> > >>here. >> >> > > > > > > > > >>> > >> >>But >> >> > > > > > > > > >>> > >> >> >> that probably only works for non-keyed >> >> > messages. >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >I am not sure if this is right >> >>discussion >> >> > form to >> >> > > > > > bring >> >> > > > > > > > > >>>these >> >> > > > > > > > > >>> to >> >> > > > > > > > > >>> > >> >> >> >your/kafka >> >> > > > > > > > > >>> > >> >> >> >Dev team attention. This might be >>off >> >> track, >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >Thanks, >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >Bhavesh >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >On Wed, Jan 28, 2015 at 11:07 AM, >> >>Jiangjie >> >> > Qin >> >> > > > > > > > > >>> > >> >> >><j...@linkedin.com.invalid >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >wrote: >> >> > > > > > > > > >>> > >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> I’ve updated the KIP page. >>Feedbacks >> >>are >> >> > > > welcome. >> >> > > > > > > > > >>> > >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> Regarding the simple mirror maker >> >> design. I >> >> > > > thought >> >> > > > > > > > over >> >> > > > > > > > > >>>it >> >> > > > > > > > > >>> and >> >> > > > > > > > > >>> > >> >>have >> >> > > > > > > > > >>> > >> >> >> >>some >> >> > > > > > > > > >>> > >> >> >> >> worries: >> >> > > > > > > > > >>> > >> >> >> >> There are two things that might >>worth >> >> > thinking: >> >> > > > > > > > > >>> > >> >> >> >> 1. One of the enhancement to mirror >> >>maker >> >> > is >> >> > > > > > adding a >> >> > > > > > > > > >>>message >> >> > > > > > > > > >>> > >> >> >>handler to >> >> > > > > > > > > >>> > >> >> >> >> do things like reformatting. I >>think >> >>we >> >> > might >> >> > > > > > > > potentially >> >> > > > > > > > > >>> want >> >> > > > > > > > > >>> > >>to >> >> > > > > > > > > >>> > >> >> >>have >> >> > > > > > > > > >>> > >> >> >> >> more threads processing the >>messages >> >>than >> >> > the >> >> > > > > > number of >> >> > > > > > > > > >>> > >>consumers. >> >> > > > > > > > > >>> > >> >> >>If we >> >> > > > > > > > > >>> > >> >> >> >> follow the simple mirror maker >> >>solution, >> >> we >> >> > > > lose >> >> > > > > > this >> >> > > > > > > > > >>> > >>flexibility. >> >> > > > > > > > > >>> > >> >> >> >> 2. This might not matter too much, >>but >> >> > creating >> >> > > > > > more >> >> > > > > > > > > >>> consumers >> >> > > > > > > > > >>> > >> >>means >> >> > > > > > > > > >>> > >> >> >> >>more >> >> > > > > > > > > >>> > >> >> >> >> footprint of TCP connection / >>memory. >> >> > > > > > > > > >>> > >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> Any thoughts on this? >> >> > > > > > > > > >>> > >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> Thanks. >> >> > > > > > > > > >>> > >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> Jiangjie (Becket) Qin >> >> > > > > > > > > >>> > >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> On 1/26/15, 10:35 AM, "Jiangjie >>Qin" < >> >> > > > > > > > j...@linkedin.com> >> >> > > > > > > > > >>> > wrote: >> >> > > > > > > > > >>> > >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >Hi Jay and Neha, >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> >Thanks a lot for the reply and >> >> > explanation. I >> >> > > > do >> >> > > > > > agree >> >> > > > > > > > > >>>it >> >> > > > > > > > > >>> > >>makes >> >> > > > > > > > > >>> > >> >>more >> >> > > > > > > > > >>> > >> >> >> >>sense >> >> > > > > > > > > >>> > >> >> >> >> >to avoid duplicate effort and plan >> >>based >> >> > on >> >> > > > new >> >> > > > > > > > > >>>consumer. >> >> > > > > > > > > >>> I’ll >> >> > > > > > > > > >>> > >> >> >>modify >> >> > > > > > > > > >>> > >> >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >KIP. >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> >To Jay’s question on message >> >>ordering - >> >> > The >> >> > > > data >> >> > > > > > > > channel >> >> > > > > > > > > >>> > >> >>selection >> >> > > > > > > > > >>> > >> >> >> >>makes >> >> > > > > > > > > >>> > >> >> >> >> >sure that the messages from the >>same >> >> > source >> >> > > > > > partition >> >> > > > > > > > > >>>will >> >> > > > > > > > > >>> > >>sent >> >> > > > > > > > > >>> > >> >>by >> >> > > > > > > > > >>> > >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >same producer. So the order of the >> >> > messages is >> >> > > > > > > > > >>>guaranteed >> >> > > > > > > > > >>> with >> >> > > > > > > > > >>> > >> >> >>proper >> >> > > > > > > > > >>> > >> >> >> >> >producer settings >> >> > > > > > > > > >>> > >> >> >>>>(MaxInFlightRequests=1,retries=Integer.MaxValue, >> >> > > > > > > > > >>> > >> >> >> >>etc.) >> >> > > > > > > > > >>> > >> >> >> >> >For keyed messages, because they >>come >> >> > from the >> >> > > > > > same >> >> > > > > > > > > >>>source >> >> > > > > > > > > >>> > >> >>partition >> >> > > > > > > > > >>> > >> >> >> >>and >> >> > > > > > > > > >>> > >> >> >> >> >will end up in the same target >> >> partition, >> >> > as >> >> > > > long >> >> > > > > > as >> >> > > > > > > > > >>>they >> >> > > > > > > > > >>> are >> >> > > > > > > > > >>> > >> >>sent >> >> > > > > > > > > >>> > >> >> >>by >> >> > > > > > > > > >>> > >> >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >same producer, the order is >> >>guaranteed. >> >> > > > > > > > > >>> > >> >> >> >> >For non-keyed messages, the >>messages >> >> > coming >> >> > > > from >> >> > > > > > the >> >> > > > > > > > > >>>same >> >> > > > > > > > > >>> > >>source >> >> > > > > > > > > >>> > >> >> >> >>partition >> >> > > > > > > > > >>> > >> >> >> >> >might go to different target >> >>partitions. >> >> > The >> >> > > > > > order is >> >> > > > > > > > > >>>only >> >> > > > > > > > > >>> > >> >> >>guaranteed >> >> > > > > > > > > >>> > >> >> >> >> >within each partition. >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> >Anyway, I’ll modify the KIP and >>data >> >> > channel >> >> > > > will >> >> > > > > > be >> >> > > > > > > > > >>>away. >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> >Thanks. >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> >Jiangjie (Becket) Qin >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> >On 1/25/15, 4:34 PM, "Neha >>Narkhede" >> >>< >> >> > > > > > > > n...@confluent.io> >> >> > > > > > > > > >>> > >>wrote: >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> >>I think there is some value in >> >> > investigating >> >> > > > if >> >> > > > > > we >> >> > > > > > > > can >> >> > > > > > > > > >>>go >> >> > > > > > > > > >>> > >>back >> >> > > > > > > > > >>> > >> >>to >> >> > > > > > > > > >>> > >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >>simple mirror maker design, as >>Jay >> >> points >> >> > > > out. >> >> > > > > > Here >> >> > > > > > > > you >> >> > > > > > > > > >>> have >> >> > > > > > > > > >>> > >>N >> >> > > > > > > > > >>> > >> >> >> >>threads, >> >> > > > > > > > > >>> > >> >> >> >> >>each has a consumer and a >>producer. >> >> > > > > > > > > >>> > >> >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >>The reason why we had to move >>away >> >>from >> >> > that >> >> > > > was >> >> > > > > > a >> >> > > > > > > > > >>> > >>combination >> >> > > > > > > > > >>> > >> >>of >> >> > > > > > > > > >>> > >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >>difference in throughput between >>the >> >> > consumer >> >> > > > > > and the >> >> > > > > > > > > >>>old >> >> > > > > > > > > >>> > >> >>producer >> >> > > > > > > > > >>> > >> >> >>and >> >> > > > > > > > > >>> > >> >> >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >>deficiency of the consumer >> >>rebalancing >> >> > that >> >> > > > > > limits >> >> > > > > > > > the >> >> > > > > > > > > >>> total >> >> > > > > > > > > >>> > >> >> >>number of >> >> > > > > > > > > >>> > >> >> >> >> >>mirror maker threads. So the only >> >> option >> >> > > > > > available >> >> > > > > > > > was >> >> > > > > > > > > >>>to >> >> > > > > > > > > >>> > >> >>increase >> >> > > > > > > > > >>> > >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >>throughput of the limited # of >> >>mirror >> >> > maker >> >> > > > > > threads >> >> > > > > > > > > >>>that >> >> > > > > > > > > >>> > >>could >> >> > > > > > > > > >>> > >> >>be >> >> > > > > > > > > >>> > >> >> >> >> >>deployed. >> >> > > > > > > > > >>> > >> >> >> >> >>Now that queuing design may not >>make >> >> > sense, >> >> > > > if >> >> > > > > > the >> >> > > > > > > > new >> >> > > > > > > > > >>> > >> >>producer's >> >> > > > > > > > > >>> > >> >> >> >> >>throughput is almost similar to >>the >> >> > consumer >> >> > > > AND >> >> > > > > > the >> >> > > > > > > > > >>>fact >> >> > > > > > > > > >>> > >>that >> >> > > > > > > > > >>> > >> >>the >> >> > > > > > > > > >>> > >> >> >>new >> >> > > > > > > > > >>> > >> >> >> >> >>round-robin based consumer >> >>rebalancing >> >> > can >> >> > > > allow >> >> > > > > > a >> >> > > > > > > > very >> >> > > > > > > > > >>> high >> >> > > > > > > > > >>> > >> >> >>number of >> >> > > > > > > > > >>> > >> >> >> >> >>mirror maker instances to exist. >> >> > > > > > > > > >>> > >> >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >>This is the end state that the >> >>mirror >> >> > maker >> >> > > > > > should be >> >> > > > > > > > > >>>in >> >> > > > > > > > > >>> once >> >> > > > > > > > > >>> > >> >>the >> >> > > > > > > > > >>> > >> >> >>new >> >> > > > > > > > > >>> > >> >> >> >> >>consumer is complete, so it >>wouldn't >> >> > hurt to >> >> > > > see >> >> > > > > > if >> >> > > > > > > > we >> >> > > > > > > > > >>>can >> >> > > > > > > > > >>> > >>just >> >> > > > > > > > > >>> > >> >> >>move >> >> > > > > > > > > >>> > >> >> >> >>to >> >> > > > > > > > > >>> > >> >> >> >> >>that right now. >> >> > > > > > > > > >>> > >> >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, >>Jay >> >> > Kreps >> >> > > > > > > > > >>> > >><jay.kr...@gmail.com >> >> > > > > > > > > >>> > >> > >> >> > > > > > > > > >>> > >> >> >> >>wrote: >> >> > > > > > > > > >>> > >> >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >>> QQ: If we ever use a different >> >> > technique >> >> > > > for >> >> > > > > > the >> >> > > > > > > > data >> >> > > > > > > > > >>> > >>channel >> >> > > > > > > > > >>> > >> >> >> >>selection >> >> > > > > > > > > >>> > >> >> >> >> >>> than for the producer >>partitioning >> >> > won't >> >> > > > that >> >> > > > > > break >> >> > > > > > > > > >>> > >>ordering? >> >> > > > > > > > > >>> > >> >>How >> >> > > > > > > > > >>> > >> >> >> >>can >> >> > > > > > > > > >>> > >> >> >> >> >>>we >> >> > > > > > > > > >>> > >> >> >> >> >>> ensure these things stay in >>sync? >> >> > > > > > > > > >>> > >> >> >> >> >>> >> >> > > > > > > > > >>> > >> >> >> >> >>> With respect to the new >> >>consumer--I >> >> > really >> >> > > > do >> >> > > > > > want >> >> > > > > > > > to >> >> > > > > > > > > >>> > >> >>encourage >> >> > > > > > > > > >>> > >> >> >> >>people >> >> > > > > > > > > >>> > >> >> >> >> >>>to >> >> > > > > > > > > >>> > >> >> >> >> >>> think through how MM will work >> >>with >> >> > the new >> >> > > > > > > > consumer. >> >> > > > > > > > > >>>I >> >> > > > > > > > > >>> > >>mean >> >> > > > > > > > > >>> > >> >>this >> >> > > > > > > > > >>> > >> >> >> >>isn't >> >> > > > > > > > > >>> > >> >> >> >> >>> very far off, maybe a few >>months >> >>if >> >> we >> >> > > > hustle? >> >> > > > > > I >> >> > > > > > > > > >>>could >> >> > > > > > > > > >>> > >> >>imagine us >> >> > > > > > > > > >>> > >> >> >> >> >>>getting >> >> > > > > > > > > >>> > >> >> >> >> >>> this mm fix done maybe sooner, >> >>maybe >> >> > in a >> >> > > > > > month? >> >> > > > > > > > So I >> >> > > > > > > > > >>> guess >> >> > > > > > > > > >>> > >> >>this >> >> > > > > > > > > >>> > >> >> >> >>buys >> >> > > > > > > > > >>> > >> >> >> >> >>>us an >> >> > > > > > > > > >>> > >> >> >> >> >>> extra month before we rip it >>out >> >>and >> >> > throw >> >> > > > it >> >> > > > > > away? >> >> > > > > > > > > >>>Maybe >> >> > > > > > > > > >>> > >>two? >> >> > > > > > > > > >>> > >> >> >>This >> >> > > > > > > > > >>> > >> >> >> >>bug >> >> > > > > > > > > >>> > >> >> >> >> >>>has >> >> > > > > > > > > >>> > >> >> >> >> >>> been there for a while, though, >> >> right? >> >> > Is >> >> > > > it >> >> > > > > > worth >> >> > > > > > > > > >>>it? >> >> > > > > > > > > >>> > >> >>Probably >> >> > > > > > > > > >>> > >> >> >>it >> >> > > > > > > > > >>> > >> >> >> >>is, >> >> > > > > > > > > >>> > >> >> >> >> >>>but >> >> > > > > > > > > >>> > >> >> >> >> >>> it still kind of sucks to have >>the >> >> > > > duplicate >> >> > > > > > > > effort. >> >> > > > > > > > > >>> > >> >> >> >> >>> >> >> > > > > > > > > >>> > >> >> >> >> >>> So anyhow let's definitely >>think >> >> about >> >> > how >> >> > > > > > things >> >> > > > > > > > > >>>will >> >> > > > > > > > > >>> work >> >> > > > > > > > > >>> > >> >>with >> >> > > > > > > > > >>> > >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >>>new >> >> > > > > > > > > >>> > >> >> >> >> >>> consumer. I think we can >>probably >> >> just >> >> > > > have N >> >> > > > > > > > > >>>threads, >> >> > > > > > > > > >>> each >> >> > > > > > > > > >>> > >> >> >>thread >> >> > > > > > > > > >>> > >> >> >> >>has >> >> > > > > > > > > >>> > >> >> >> >> >>>a >> >> > > > > > > > > >>> > >> >> >> >> >>> producer and consumer and is >> >> internally >> >> > > > single >> >> > > > > > > > > >>>threaded. >> >> > > > > > > > > >>> > >>Any >> >> > > > > > > > > >>> > >> >> >>reason >> >> > > > > > > > > >>> > >> >> >> >> >>>this >> >> > > > > > > > > >>> > >> >> >> >> >>> wouldn't work? >> >> > > > > > > > > >>> > >> >> >> >> >>> >> >> > > > > > > > > >>> > >> >> >> >> >>> -Jay >> >> > > > > > > > > >>> > >> >> >> >> >>> >> >> > > > > > > > > >>> > >> >> >> >> >>> >> >> > > > > > > > > >>> > >> >> >> >> >>> On Wed, Jan 21, 2015 at 5:29 >>PM, >> >> > Jiangjie >> >> > > > Qin >> >> > > > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid> >> >> > > > > > > > > >>> > >> >> >> >> >>> wrote: >> >> > > > > > > > > >>> > >> >> >> >> >>> >> >> > > > > > > > > >>> > >> >> >> >> >>> > Hi Jay, >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> > > > > > > > > >>> > >> >> >> >> >>> > Thanks for comments. Please >>see >> >> > inline >> >> > > > > > responses. >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> > > > > > > > > >>> > >> >> >> >> >>> > Jiangjie (Becket) Qin >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> > > > > > > > > >>> > >> >> >> >> >>> > On 1/21/15, 1:33 PM, "Jay >>Kreps" >> >> > > > > > > > > >>><jay.kr...@gmail.com> >> >> > > > > > > > > >>> > >> >>wrote: >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >Hey guys, >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >A couple questions/comments: >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >1. The callback and >> >> user-controlled >> >> > > > commit >> >> > > > > > > > offset >> >> > > > > > > > > >>> > >> >> >>functionality >> >> > > > > > > > > >>> > >> >> >> >>is >> >> > > > > > > > > >>> > >> >> >> >> >>> already >> >> > > > > > > > > >>> > >> >> >> >> >>> > >in the new consumer which we >> >>are >> >> > > > working on >> >> > > > > > in >> >> > > > > > > > > >>> parallel. >> >> > > > > > > > > >>> > >> >>If we >> >> > > > > > > > > >>> > >> >> >> >> >>> accelerated >> >> > > > > > > > > >>> > >> >> >> >> >>> > >that work it might help >> >> concentrate >> >> > > > > > efforts. I >> >> > > > > > > > > >>>admit >> >> > > > > > > > > >>> > >>this >> >> > > > > > > > > >>> > >> >> >>might >> >> > > > > > > > > >>> > >> >> >> >>take >> >> > > > > > > > > >>> > >> >> >> >> >>> > >slightly longer in calendar >> >>time >> >> but >> >> > > > could >> >> > > > > > still >> >> > > > > > > > > >>> > >>probably >> >> > > > > > > > > >>> > >> >>get >> >> > > > > > > > > >>> > >> >> >> >>done >> >> > > > > > > > > >>> > >> >> >> >> >>>this >> >> > > > > > > > > >>> > >> >> >> >> >>> > >quarter. Have you guys >> >>considered >> >> > that >> >> > > > > > approach? >> >> > > > > > > > > >>> > >> >> >> >> >>> > Yes, I totally agree that >> >>ideally >> >> we >> >> > > > should >> >> > > > > > put >> >> > > > > > > > > >>>efforts >> >> > > > > > > > > >>> > >>on >> >> > > > > > > > > >>> > >> >>new >> >> > > > > > > > > >>> > >> >> >> >> >>>consumer. >> >> > > > > > > > > >>> > >> >> >> >> >>> > The main reason for still >> >>working >> >> on >> >> > the >> >> > > > old >> >> > > > > > > > > >>>consumer >> >> > > > > > > > > >>> is >> >> > > > > > > > > >>> > >> >>that >> >> > > > > > > > > >>> > >> >> >>we >> >> > > > > > > > > >>> > >> >> >> >> >>>expect >> >> > > > > > > > > >>> > >> >> >> >> >>> it >> >> > > > > > > > > >>> > >> >> >> >> >>> > would still be used in >>LinkedIn >> >>for >> >> > > > quite a >> >> > > > > > while >> >> > > > > > > > > >>> before >> >> > > > > > > > > >>> > >>the >> >> > > > > > > > > >>> > >> >> >>new >> >> > > > > > > > > >>> > >> >> >> >> >>>consumer >> >> > > > > > > > > >>> > >> >> >> >> >>> > could be fully rolled out. >>And >> >>we >> >> > > > recently >> >> > > > > > > > > >>>suffering a >> >> > > > > > > > > >>> > >>lot >> >> > > > > > > > > >>> > >> >>from >> >> > > > > > > > > >>> > >> >> >> >> >>>mirror >> >> > > > > > > > > >>> > >> >> >> >> >>> > maker data loss issue. So our >> >> current >> >> > > > plan is >> >> > > > > > > > > >>>making >> >> > > > > > > > > >>> > >> >>necessary >> >> > > > > > > > > >>> > >> >> >> >> >>>changes to >> >> > > > > > > > > >>> > >> >> >> >> >>> > make current mirror maker >> >>stable in >> >> > > > > > production. >> >> > > > > > > > > >>>Then we >> >> > > > > > > > > >>> > >>can >> >> > > > > > > > > >>> > >> >> >>test >> >> > > > > > > > > >>> > >> >> >> >>and >> >> > > > > > > > > >>> > >> >> >> >> >>> > rollout new consumer >>gradually >> >> > without >> >> > > > > > getting >> >> > > > > > > > > >>>burnt. >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >2. I think partitioning on >>the >> >> hash >> >> > of >> >> > > > the >> >> > > > > > topic >> >> > > > > > > > > >>> > >>partition >> >> > > > > > > > > >>> > >> >>is >> >> > > > > > > > > >>> > >> >> >> >>not a >> >> > > > > > > > > >>> > >> >> >> >> >>>very >> >> > > > > > > > > >>> > >> >> >> >> >>> > >good idea because that will >> >>make >> >> the >> >> > > > case of >> >> > > > > > > > going >> >> > > > > > > > > >>> from >> >> > > > > > > > > >>> > >>a >> >> > > > > > > > > >>> > >> >> >>cluster >> >> > > > > > > > > >>> > >> >> >> >> >>>with >> >> > > > > > > > > >>> > >> >> >> >> >>> > >fewer partitions to one with >> >>more >> >> > > > > > partitions not >> >> > > > > > > > > >>> work. I >> >> > > > > > > > > >>> > >> >> >>think an >> >> > > > > > > > > >>> > >> >> >> >> >>> > >intuitive >> >> > > > > > > > > >>> > >> >> >> >> >>> > >way to do this would be the >> >> > following: >> >> > > > > > > > > >>> > >> >> >> >> >>> > >a. Default behavior: Just do >> >>what >> >> > the >> >> > > > > > producer >> >> > > > > > > > > >>>does. >> >> > > > > > > > > >>> > >>I.e. >> >> > > > > > > > > >>> > >> >>if >> >> > > > > > > > > >>> > >> >> >>you >> >> > > > > > > > > >>> > >> >> >> >> >>> specify a >> >> > > > > > > > > >>> > >> >> >> >> >>> > >key use it for >>partitioning, if >> >> not >> >> > just >> >> > > > > > > > partition >> >> > > > > > > > > >>>in >> >> > > > > > > > > >>> a >> >> > > > > > > > > >>> > >> >> >> >>round-robin >> >> > > > > > > > > >>> > >> >> >> >> >>> > >fashion. >> >> > > > > > > > > >>> > >> >> >> >> >>> > >b. Add a >>--preserve-partition >> >> option >> >> > > > that >> >> > > > > > will >> >> > > > > > > > > >>> > >>explicitly >> >> > > > > > > > > >>> > >> >> >> >>inherent >> >> > > > > > > > > >>> > >> >> >> >> >>>the >> >> > > > > > > > > >>> > >> >> >> >> >>> > >partition from the source >> >> > irrespective >> >> > > > of >> >> > > > > > > > whether >> >> > > > > > > > > >>> there >> >> > > > > > > > > >>> > >>is >> >> > > > > > > > > >>> > >> >>a >> >> > > > > > > > > >>> > >> >> >>key >> >> > > > > > > > > >>> > >> >> >> >>or >> >> > > > > > > > > >>> > >> >> >> >> >>> which >> >> > > > > > > > > >>> > >> >> >> >> >>> > >partition that key would >>hash >> >>to. >> >> > > > > > > > > >>> > >> >> >> >> >>> > Sorry that I did not explain >> >>this >> >> > clear >> >> > > > > > enough. >> >> > > > > > > > The >> >> > > > > > > > > >>> hash >> >> > > > > > > > > >>> > >>of >> >> > > > > > > > > >>> > >> >> >>topic >> >> > > > > > > > > >>> > >> >> >> >> >>> > partition is only used when >> >>decide >> >> > which >> >> > > > > > mirror >> >> > > > > > > > > >>>maker >> >> > > > > > > > > >>> > >>data >> >> > > > > > > > > >>> > >> >> >>channel >> >> > > > > > > > > >>> > >> >> >> >> >>>queue >> >> > > > > > > > > >>> > >> >> >> >> >>> > the consumer thread should >>put >> >> > message >> >> > > > into. >> >> > > > > > It >> >> > > > > > > > > >>>only >> >> > > > > > > > > >>> > >>tries >> >> > > > > > > > > >>> > >> >>to >> >> > > > > > > > > >>> > >> >> >>make >> >> > > > > > > > > >>> > >> >> >> >> >>>sure >> >> > > > > > > > > >>> > >> >> >> >> >>> > the messages from the same >> >> partition >> >> > is >> >> > > > sent >> >> > > > > > by >> >> > > > > > > > the >> >> > > > > > > > > >>> same >> >> > > > > > > > > >>> > >> >> >>producer >> >> > > > > > > > > >>> > >> >> >> >> >>>thread >> >> > > > > > > > > >>> > >> >> >> >> >>> > to guarantee the sending >>order. >> >> This >> >> > is >> >> > > > not >> >> > > > > > at >> >> > > > > > > > all >> >> > > > > > > > > >>> > >>related >> >> > > > > > > > > >>> > >> >>to >> >> > > > > > > > > >>> > >> >> >> >>which >> >> > > > > > > > > >>> > >> >> >> >> >>> > partition in target cluster >>the >> >> > messages >> >> > > > end >> >> > > > > > up. >> >> > > > > > > > > >>>That >> >> > > > > > > > > >>> is >> >> > > > > > > > > >>> > >> >>still >> >> > > > > > > > > >>> > >> >> >> >> >>>decided by >> >> > > > > > > > > >>> > >> >> >> >> >>> > producer. >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >3. You don't actually give >>the >> >> > > > > > > > > >>> ConsumerRebalanceListener >> >> > > > > > > > > >>> > >> >> >> >>interface. >> >> > > > > > > > > >>> > >> >> >> >> >>>What >> >> > > > > > > > > >>> > >> >> >> >> >>> > >is >> >> > > > > > > > > >>> > >> >> >> >> >>> > >that going to look like? >> >> > > > > > > > > >>> > >> >> >> >> >>> > Good point! I should have put >> >>it in >> >> > the >> >> > > > > > wiki. I >> >> > > > > > > > > >>>just >> >> > > > > > > > > >>> > >>added >> >> > > > > > > > > >>> > >> >>it. >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >4. What is >>MirrorMakerRecord? I >> >> > think >> >> > > > > > ideally >> >> > > > > > > > the >> >> > > > > > > > > >>> > >> >> >> >> >>> > >MirrorMakerMessageHandler >> >> > > > > > > > > >>> > >> >> >> >> >>> > >interface would take a >> >> > ConsumerRecord as >> >> > > > > > input >> >> > > > > > > > and >> >> > > > > > > > > >>> > >>return a >> >> > > > > > > > > >>> > >> >> >> >> >>> > >ProducerRecord, >> >> > > > > > > > > >>> > >> >> >> >> >>> > >right? That would allow you >>to >> >> > > > transform the >> >> > > > > > > > key, >> >> > > > > > > > > >>> value, >> >> > > > > > > > > >>> > >> >> >> >>partition, >> >> > > > > > > > > >>> > >> >> >> >> >>>or >> >> > > > > > > > > >>> > >> >> >> >> >>> > >destination topic... >> >> > > > > > > > > >>> > >> >> >> >> >>> > MirrorMakerRecord is >>introduced >> >>in >> >> > > > > > KAFKA-1650, >> >> > > > > > > > > >>>which is >> >> > > > > > > > > >>> > >> >>exactly >> >> > > > > > > > > >>> > >> >> >> >>the >> >> > > > > > > > > >>> > >> >> >> >> >>>same >> >> > > > > > > > > >>> > >> >> >> >> >>> > as ConsumerRecord in >>KAFKA-1760. >> >> > > > > > > > > >>> > >> >> >> >> >>> > private[kafka] class >> >> > MirrorMakerRecord >> >> > > > (val >> >> > > > > > > > > >>> sourceTopic: >> >> > > > > > > > > >>> > >> >> >>String, >> >> > > > > > > > > >>> > >> >> >> >> >>> > val sourcePartition: Int, >> >> > > > > > > > > >>> > >> >> >> >> >>> > val sourceOffset: Long, >> >> > > > > > > > > >>> > >> >> >> >> >>> > val key: Array[Byte], >> >> > > > > > > > > >>> > >> >> >> >> >>> > val value: Array[Byte]) { >> >> > > > > > > > > >>> > >> >> >> >> >>> > def size = value.length + >>{if >> >> (key >> >> > == >> >> > > > > > null) 0 >> >> > > > > > > > > >>>else >> >> > > > > > > > > >>> > >> >> >>key.length} >> >> > > > > > > > > >>> > >> >> >> >> >>> > } >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> > > > > > > > > >>> > >> >> >> >> >>> > However, because source >> >>partition >> >> and >> >> > > > offset >> >> > > > > > is >> >> > > > > > > > > >>>needed >> >> > > > > > > > > >>> in >> >> > > > > > > > > >>> > >> >> >>producer >> >> > > > > > > > > >>> > >> >> >> >> >>>thread >> >> > > > > > > > > >>> > >> >> >> >> >>> > for consumer offsets >> >>bookkeeping, >> >> the >> >> > > > record >> >> > > > > > > > > >>>returned >> >> > > > > > > > > >>> by >> >> > > > > > > > > >>> > >> >> >> >> >>> > MirrorMakerMessageHandler >>needs >> >>to >> >> > > > contain >> >> > > > > > those >> >> > > > > > > > > >>> > >> >>information. >> >> > > > > > > > > >>> > >> >> >> >> >>>Therefore >> >> > > > > > > > > >>> > >> >> >> >> >>> > ProducerRecord does not work >> >>here. >> >> We >> >> > > > could >> >> > > > > > > > > >>>probably >> >> > > > > > > > > >>> let >> >> > > > > > > > > >>> > >> >> >>message >> >> > > > > > > > > >>> > >> >> >> >> >>>handler >> >> > > > > > > > > >>> > >> >> >> >> >>> > take ConsumerRecord for both >> >>input >> >> > and >> >> > > > > > output. >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >5. Have you guys thought >>about >> >> what >> >> > the >> >> > > > > > > > > >>>implementation >> >> > > > > > > > > >>> > >>will >> >> > > > > > > > > >>> > >> >> >>look >> >> > > > > > > > > >>> > >> >> >> >> >>>like in >> >> > > > > > > > > >>> > >> >> >> >> >>> > >terms of threading >>architecture >> >> etc >> >> > with >> >> > > > > > the new >> >> > > > > > > > > >>> > >>consumer? >> >> > > > > > > > > >>> > >> >> >>That >> >> > > > > > > > > >>> > >> >> >> >>will >> >> > > > > > > > > >>> > >> >> >> >> >>>be >> >> > > > > > > > > >>> > >> >> >> >> >>> > >soon so even if we aren't >> >>starting >> >> > with >> >> > > > that >> >> > > > > > > > let's >> >> > > > > > > > > >>> make >> >> > > > > > > > > >>> > >> >>sure >> >> > > > > > > > > >>> > >> >> >>we >> >> > > > > > > > > >>> > >> >> >> >>can >> >> > > > > > > > > >>> > >> >> >> >> >>>get >> >> > > > > > > > > >>> > >> >> >> >> >>> > >rid >> >> > > > > > > > > >>> > >> >> >> >> >>> > >of a lot of the current >>mirror >> >> maker >> >> > > > > > accidental >> >> > > > > > > > > >>> > >>complexity >> >> > > > > > > > > >>> > >> >>in >> >> > > > > > > > > >>> > >> >> >> >>terms >> >> > > > > > > > > >>> > >> >> >> >> >>>of >> >> > > > > > > > > >>> > >> >> >> >> >>> > >threads and queues when we >> >>move to >> >> > that. >> >> > > > > > > > > >>> > >> >> >> >> >>> > I haven¹t thought about it >> >> > throughly. The >> >> > > > > > quick >> >> > > > > > > > > >>>idea is >> >> > > > > > > > > >>> > >> >>after >> >> > > > > > > > > >>> > >> >> >> >> >>>migration >> >> > > > > > > > > >>> > >> >> >> >> >>> to >> >> > > > > > > > > >>> > >> >> >> >> >>> > the new consumer, it is >>probably >> >> > better >> >> > > > to >> >> > > > > > use a >> >> > > > > > > > > >>>single >> >> > > > > > > > > >>> > >> >> >>consumer >> >> > > > > > > > > >>> > >> >> >> >> >>>thread. >> >> > > > > > > > > >>> > >> >> >> >> >>> > If multithread is needed, >> >> decoupling >> >> > > > > > consumption >> >> > > > > > > > > >>>and >> >> > > > > > > > > >>> > >> >>processing >> >> > > > > > > > > >>> > >> >> >> >>might >> >> > > > > > > > > >>> > >> >> >> >> >>>be >> >> > > > > > > > > >>> > >> >> >> >> >>> > used. MirrorMaker definitely >> >>needs >> >> > to be >> >> > > > > > changed >> >> > > > > > > > > >>>after >> >> > > > > > > > > >>> > >>new >> >> > > > > > > > > >>> > >> >> >> >>consumer >> >> > > > > > > > > >>> > >> >> >> >> >>>get >> >> > > > > > > > > >>> > >> >> >> >> >>> > checked in. I¹ll document the >> >> changes >> >> > > > and can >> >> > > > > > > > > >>>submit >> >> > > > > > > > > >>> > >>follow >> >> > > > > > > > > >>> > >> >>up >> >> > > > > > > > > >>> > >> >> >> >> >>>patches >> >> > > > > > > > > >>> > >> >> >> >> >>> > after the new consumer is >> >> available. >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >-Jay >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 >> >>PM, >> >> > > > Jiangjie >> >> > > > > > Qin >> >> > > > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >wrote: >> >> > > > > > > > > >>> > >> >> >> >> >>> > > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> Hi Kafka Devs, >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> We are working on Kafka >> >>Mirror >> >> > Maker >> >> > > > > > > > > >>>enhancement. A >> >> > > > > > > > > >>> > >>KIP >> >> > > > > > > > > >>> > >> >>is >> >> > > > > > > > > >>> > >> >> >> >>posted >> >> > > > > > > > > >>> > >> >> >> >> >>>to >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> document and discuss on >>the >> >> > > > followings: >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> 1. KAFKA-1650: No Data >>loss >> >> mirror >> >> > > > maker >> >> > > > > > > > change >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> 2. KAFKA-1839: To allow >> >> partition >> >> > > > aware >> >> > > > > > > > mirror. >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> 3. KAFKA-1840: To allow >> >>message >> >> > > > > > > > filtering/format >> >> > > > > > > > > >>> > >> >>conversion >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> Feedbacks are welcome. >>Please >> >> let >> >> > us >> >> > > > know >> >> > > > > > if >> >> > > > > > > > you >> >> > > > > > > > > >>> have >> >> > > > > > > > > >>> > >>any >> >> > > > > > > > > >>> > >> >> >> >> >>>questions or >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> concerns. >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> Thanks. >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> Jiangjie (Becket) Qin >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> > > > > > > > > >>> > >> >> >> >> >>> > >> >> > > > > > > > > >>> > >> >> >> >> >>> >> >> > > > > > > > > >>> > >> >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >>-- >> >> > > > > > > > > >>> > >> >> >> >> >>Thanks, >> >> > > > > > > > > >>> > >> >> >> >> >>Neha >> >> > > > > > > > > >>> > >> >> >> >> > >> >> > > > > > > > > >>> > >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> >> > > > > > > > > >>> > >> >> >> >> > > > > > > > > >>> > >> >> >> >> > > > > > > > > >>> > >> >> >> > > > > > > > > >>> > >> >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > > >> >> > > > > > > > > >>> > >-- >> >> > > > > > > > > >>> > >Thanks, >> >> > > > > > > > > >>> > >Neha >> >> > > > > > > > > >>> > >> >> > > > > > > > > >>> > >> >> > > > > > > > > >>> >> >> > > > > > > > > >>> >> >> > > > > > > > > >>> -- >> >> > > > > > > > > >>> Thanks, >> >> > > > > > > > > >>> Neha >> >> > > > > > > > > >>> >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > -- >> >> > > > > > > Thanks, >> >> > > > > > > Neha >> >> > > > > > >> >> > > > > > >> >> > > > >> >> > > > >> >> > >> >> > >> >> >> > >> > >> > >> >-- >> >Thanks, >> >Neha >> >>