I forgot to mention that we have tested the entire design in our test environment. The performance looks fine.
Jiangjie (Becket) Qin On 2/24/15, 2:53 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: >I updated the KIP page based on the discussion we had. > >Should I launch another vote or we can think of this mail thread has >already included a vote? > >Jiangje (Becket) Qin > >On 2/11/15, 5:15 PM, "Neha Narkhede" <n...@confluent.io> wrote: > >>Thanks for the explanation, Joel! Would love to see the results of the >>throughput experiment and I'm a +1 on everything else, ncluding the >>rebalance callback and record handler. >> >>-Neha >> >>On Wed, Feb 11, 2015 at 1:13 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> >>> Cool, I agree with all that. >>> >>> I agree about the need for a rebalancing callback. >>> >>> Totally agree about record handler. >>> >>> It would be greatto see if a prototype of this is workable. >>> >>> Thanks guys! >>> >>> -Jay >>> >>> On Wed, Feb 11, 2015 at 12:36 PM, Joel Koshy <jjkosh...@gmail.com> >>>wrote: >>> >>> > Hey Jay, >>> > >>> > Guozhang, Becket and I got together to discuss this and we think: >>> > >>> > - It seems that your proposal based on the new consumer and flush >>>call >>> > should work. >>> > - We would likely need to call the poll with a timeout that matches >>> > the offset commit interval in order to deal with low volume >>> > mirroring pipelines. >>> > - We will still need a rebalance callback to reduce duplicates - the >>> > rebalance callback would need to flush and commit offsets. >>> > - The only remaining question is if the overall throughput is >> > sufficient. I think someone at LinkedIn (I don't remember who) did >>> > some experiments with data channel size == 1 and ran into issues. >>> > That was not thoroughly investigated though. >>> > - The addition of flush may actually make this solution viable for >>>the >>> > current mirror-maker (with the old consumer). We can prototype that >>> > offline and if it works out well we can redo KAFKA-1650 (i.e., >>> > refactor the current mirror maker). The flush call and the new >>> > consumer didn't exist at the time we did KAFKA-1650 so this did not >>> > occur to us. >>> > - We think the RecordHandler is still a useful small addition for the >>> > use-cases mentioned earlier in this thread. >>> > >>> > Thanks, >>> > >>> > Joel >>> > >>> > On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote: >>> > > Guozhang, I agree with 1-3, I do think what I was proposing was >>>simpler >>> > but >>> > > perhaps there are gaps in that? >>> > > >>> > > Hey Joel--Here was a sketch of what I was proposing. I do think >>>this >>> > get's >>> > >rid of manual offset tracking, especially doing so across threads >>>with >>> > > dedicated commit threads, which I think is pretty complex. >>> > > >>> > > wile(true) { >>> > > val recs = consumer.poll(Long.MaxValue); >>> > > for (rec <- recs) >>> > > producer.send(rec, logErrorCallback) >>> > > if(System.currentTimeMillis - lastCommit > commitInterval) { >>> > > producer.flush() >>> > > consumer.commit() >>> > > lastCommit = System.currentTimeMillis >>> > > } >>> > > } >>> > > >>> > > (See the previous email for details). I think the question is: is >>>there >>> > any >>> > > reason--performance, correctness, etc--that this won't work? >>>Basically >>> I >>> > > think you guys have thought about this more so I may be missing >>> > something. >>> > > If so let's flag it while we still have leeway on the consumer. >>> > > >>> > > If we think that will work, well I do think it is conceptually a >>>lot >>> > > simpler than the current code, though I suppose one could disagree >>>on >>> > that. >>> > > >>> > > -Jay >>> > > >>> > > On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy <jjkosh...@gmail.com> >>> wrote: >>> > > >>> > > > Hi Jay, >>> > > > >>> > > > > The data channels are actually a big part of the complexity of >>>the >>> > zero >>> > > > > data loss design, though, right? Because then you need some >>>reverse >>> > > > channel >>> > > > > to flow the acks back to the consumer based on where you are >>>versus >>> > just >>> > > > > acking what you have read and written (as in the code snippet I >>>put >>> > up). >>> > > > >>> > > > I'm not sure if we are on the same page. Even if the data channel >>>was >>> > > > not there the current handling for zero data loss would remain >>>very >>> > > > similar - you would need to maintain lists of unacked source >>>offsets. >>> > > > I'm wondering if the KIP needs more detail on how it is currently >>> > > > implemented; or are suggesting a different approach (in which >>>case I >>> > > > have not fully understood). I'm not sure what you mean by flowing >>> acks >>> > > > back to the consumer - the MM commits offsets after the producer >>>ack >>> > > > has been received. There is some additional complexity introduced >>>in >>> > > > reducing duplicates on a rebalance - this is actually optional >>>(since >>> > > > duplicates are currently a given). The reason that was done >>>anyway is >>> > > > that with the auto-commit turned off duplicates are almost >>>guaranteed >>> > > > on a rebalance. >>> > > > >>> > > > > I think the point that Neha and I were trying to make was that >>>the >>> > > > > motivation to embed stuff into MM kind of is related to how >>> complex a >>> > > > > simple "consume and produce" with good throughput will be. If >>>it is >>> > > > simple >>> > > > > to write such a thing in a few lines, the pain of embedding a >>>bunch >>> > of >>> > > > > stuff won't be worth it, if it has to be as complex as the >>>current >>> mm >>> > > > then >>> > > > > of course we will need all kinds of plug ins because no one >>>will be >>> > able >>> > > > to >>> > > > > write such a thing. I don't have a huge concern with a simple >>> plug-in >>> > > > but I >>> > > > > think if it turns into something more complex with filtering >>>and >>> > > > > aggregation or whatever we really need to stop and think a bit >>> about >>> > the >>> > > > > design. >>> > > > >>> > > > I agree - I don't think there is a use-case for any complex >>>plug-in. >>> > > > It is pretty much what Becket has described currently for the >>>message >>> > > > handler - i.e., take an incoming record and return a list of >>>outgoing >>> > > > records (which could be empty if you filter). >>> > > > >>> > > > So here is my take on the MM: >>> > > > - Bare bones: simple consumer - producer pairs (0.7 style). This >>>is >>> > > > ideal, but does not handle no data loss >>> > > > - Above plus support no data loss. This actually adds quite a bit >>>of >>> > > > complexity. >>> > > > - Above plus the message handler. This is a trivial addition I >>>think >>> > > > that makes the MM usable in a few other mirroring-like >>> applications. >>> > > > >>> > > > Joel >>> > > > >>> > > > > On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy >>><jjkosh...@gmail.com> >>> > > > wrote: >>> > > > > >>> > > > > > >>> > > > > > >>> > > > > > On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede >>>wrote: >>> > > > > > > I think all of us agree that we want to design MirrorMaker >>>for >>> 0 >>> > data >>> > > > > > loss. >>> > > > > > > With the absence of the data channel, 0 data loss will be >>>much >>> > > > simpler to >>> > > > > > > implement. >>> > > > > > >>> > > > > > The data channel is irrelevant to the implementation of zero >>>data >>> > > > > > loss. The complexity in the implementation of no data loss >>>that >>> you >>> > > > > > are seeing in mirror-maker affects all consume-then-produce >>> > patterns >>> > > > > > whether or not there is a data channel. You still need to >>> > maintain a >>> > > > > > list of unacked offsets. What I meant earlier is that we can >>> > > > > > brainstorm completely different approaches to supporting no >>>data >>> > loss, >>> > > > > > but the current implementation is the only solution we are >>>aware >>> > of. >>> > > > > > >>> > > > > > > >>> > > > > > > My arguments for adding a message handler are that: >>> > > > > > > > 1. It is more efficient to do something in common for all >>>the >>> > > > clients >>> > > > > > in >>> > > > > > > > pipeline than letting each client do the same thing for >>>many >>> > > > times. And >>> > > > > > > > there are concrete use cases for the message handler >>>already. >>> > > > > > > > >>> > > > > > > >>> > > > > > > What are the concrete use cases? >>> > > > > > >>> > > > > > I think Becket already described a couple of use cases >>>earlier in >>> > the >>> > > > > > thread. >>> > > > > > >>> > > > > > <quote> >>> > > > > > >>> > > > > > 1. Format conversion. We have a use case where clients of >>>source >>> > > > > > cluster >>> > > > > > use an internal schema and clients of target cluster use a >>> > different >>> > > > > > public schema. >>> > > > > > 2. Message filtering: For the messages published to source >>> cluster, >>> > > > > > there >>> > > > > > are some messages private to source cluster clients and >>>should >>> not >>> > > > > > exposed >>> > > > > > to target cluster clients. It would be difficult to publish >>>those >>> > > > > > messages >>> > > > > > into different partitions because they need to be ordered. >>> > > > > > I agree that we can always filter/convert messages after they >>>are >>> > > > > > copied >>> > > > > > to the target cluster, but that costs network bandwidth >>> > unnecessarily, >>> > > > > > especially if that is a cross colo mirror. With the handler, >>>we >>> can >>> > > > > > co-locate the mirror maker with source cluster and save that >>> cost. >>> > > > > > Also, >>> > > > > > imagine there are many downstream consumers consuming from >>>the >>> > target >>> > > > > > cluster, filtering/reformatting the messages before the >>>messages >>> > reach >>> > > > > > the >>> > > > > > target cluster is much more efficient than having each of the >>> > > > > > consumers do >>> > > > > > this individually on their own. >>> > > > > > >>> > > > > > </quote> >>> > > > > > >>> > > > > > > >>> > > > > > > Also the KIP still refers to the data channel in a few >>>places >>> > > > (Motivation >>> > > > > > > and "On consumer rebalance" sections). Can you update the >>>wiki >>> > so it >>> > > > is >>> > > > > > > easier to review the new design, especially the data loss >>>part. >>> > > > > > > >>> > > > > > > >>> > > > > > > On Tue, Feb 10, 2015 at 10:36 AM, Joel Koshy < >>> > jjkosh...@gmail.com> >>> > > > > > wrote: >>> > > > > > > >>> > > > > > > > I think the message handler adds little to no complexity >>>to >>> the >>> > > > mirror >>> > > > > > > > maker. Jay/Neha, the MM became scary due to the >>> rearchitecture >>> > we >>> > > > did >>> > > > > > > > for 0.8 due to performance issues compared with 0.7 - we >>> should >>> > > > remove >>> > > > > > > > the data channel if it can match the current throughput. >>>I >>> > agree >>> > > > it is >>> > > > > > > > worth prototyping and testing that so the MM architecture >>>is >>> > > > > > > > simplified. >>> > > > > > > > >>> > > > > > > > The MM became a little scarier in KAFKA-1650 in order to >>> > support no >>> > > > > > > > data loss. I think the implementation for no data loss >>>will >>> > remain >>> > > > > > > > about the same even in the new model (even without the >>>data >>> > > > channel) - >>> > > > > > > > we can probably brainstorm more if there is a >>>better/simpler >>> > way >>> > > > to do >>> > > > > > > > it (maybe there is in the absence of the data channel) >>>but at >>> > the >>> > > > time >>> > > > > > > > it was the best we (i.e., Becket, myself, Jun and >>>Guozhang >>> who >>> > > > > > > > participated on the review) could come up with. >>> > > > > > > > >>> > > > > > > > So I'm definitely +1 on whatever it takes to support no >>>data >>> > loss. >>> > > > I >>> > > > > > > > think most people would want that out of the box. >>> > > > > > > > >>> > > > > > > > As for the message handler, as Becket wrote and I agree >>>with, >>> > it is >>> > > > > > > > really a trivial addition that would benefit (perhaps not >>> most, >>> > > > but at >>> > > > > > > > least some). So I'm personally +1 on that as well. That >>>said, >>> > I'm >>> > > > also >>> > > > > > > > okay with it not being there. I think the MM is fairly >>> > stand-alone >>> > > > and >>> > > > > > > > simple enough that it is entirely reasonable and >>>absolutely >>> > > > feasible >>> > > > > > > > for companies to fork/re-implement the mirror maker for >>>their >>> > own >>> > > > > > > > needs. >>> > > > > > > > >>> > > > > > > > So in summary, I'm +1 on the KIP. >>> > > > > > > > >>> > > > > > > > Thanks, >>> > > > > > > > >>> > > > > > > > Joel >>> > > > > > > > >>> > > > > > > > On Mon, Feb 09, 2015 at 09:19:57PM +0000, Jiangjie Qin >>>wrote: >>> > > > > > > > > I just updated the KIP page and incorporated Jay and >>>Neha’s >>> > > > > > suggestion. >>> > > > > > > > As >>> > > > > > > > > a brief summary of where we are: >>> > > > > > > > > >>> > > > > > > > > Consensus reached: >>> > > > > > > > > Have N independent mirror maker threads each has their >>>own >>> > > > consumers >>> > > > > > but >>> > > > > > > > > share a producer. The mirror maker threads will be >>> > responsible >>> > > > for >>> > > > > > > > > decompression, compression and offset commit. No data >>> > channel and >>> > > > > > > > separate >>> > > > > > > > > offset commit thread is needed. Consumer rebalance >>>callback >>> > will >>> > > > be >>> > > > > > used >>> > > > > > > > > to avoid duplicates on rebalance. >>> > > > > > > > > >>> > > > > > > > > Still under discussion: >>> > > > > > > > > Whether message handler is needed. >>> > > > > > > > > >>> > > > > > > > > My arguments for adding a message handler are that: >>> > > > > > > > > 1. It is more efficient to do something in common for >>>all >>> the >>> > > > > > clients in >>> > > > > > > > > pipeline than letting each client do the same thing for >>> many >>> > > > times. >>> > > > > > And >>> > > > > > > > > there are concrete use cases for the message handler >>> already. >>> > > > > > > > > 2. It is not a big complicated add-on to mirror maker. >>> > > > > > > > > 3. Without a message handler, for customers needs it, >>>they >>> > have >>> > > > to >>> > > > > > > > > re-implement all the logics of mirror maker by >>>themselves >>> > just in >>> > > > > > order >>> > > > > > > > to >>> > > > > > > > > add this handling in pipeline. >>> > > > > > > > > >>> > > > > > > > > Any thoughts? >>> > > > > > > > > >>> > > > > > > > > Thanks. >>> > > > > > > > > >>> > > > > > > > > ―Jiangjie (Becket) Qin >>> > > > > > > > > >>> > > > > > > > > On 2/8/15, 6:35 PM, "Jiangjie Qin" <j...@linkedin.com> >>> > wrote: >>> > > > > > > > > >>> > > > > > > > > >Hi Jay, thanks a lot for the comments. >>> > > > > > > > > >I think this solution is better. We probably don’t >>>need >>> data >>> > > > channel >>> > > > > > > > > >anymore. It can be replaced with a list of producer if >>>we >>> > need >>> > > > more >>> > > > > > > > sender >>> > > > > > > > > >thread. >>> > > > > > > > > >I’ll update the KIP page. >>> > > > > > > > > > >>> > > > > > > > > >The reasoning about message handler is mainly for >>> efficiency >>> > > > > > purpose. >>> > > > > > > > I’m >>> > > > > > > > > >thinking that if something can be done in pipeline for >>>all >>> > the >>> > > > > > clients >>> > > > > > > > > >such as filtering/reformatting, it is probably better >>>to >>> do >>> > it >>> > > > in >>> > > > > > the >>> > > > > > > > > >pipeline than asking 100 clients do the same thing for >>>100 >>> > > > times. >>> > > > > > > > > > >>> > > > > > > > > >―Jiangjie (Becket) Qin >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >On 2/8/15, 4:59 PM, "Jay Kreps" <jay.kr...@gmail.com> >>> > wrote: >>> > > > > > > > > > >>> > > > > > > > > >>Yeah, I second Neha's comments. The current mm code >>>has >>> > taken >>> > > > > > something >>> > > > > > > > > >>pretty simple and made it pretty scary with callbacks >>>and >>> > > > > > wait/notify >>> > > > > > > > > >>stuff. Do we believe this works? I can't tell by >>>looking >>> > at it >>> > > > > > which is >>> > > > > > > > > >>kind of bad for something important like this. I >>>don't >>> mean >>> > > > this as >>> > > > > > > > > >>criticism, I know the history: we added in memory >>>queues >>> to >>> > > > help >>> > > > > > with >>> > > > > > > > > >>other >>> > > > > > > > > >>performance problems without thinking about >>>correctness, >>> > then >>> > > > we >>> > > > > > added >>> > > > > > > > > >>stuff to work around the in-memory queues not lose >>>data, >>> > and >>> > > > so on. >>> > > > > > > > > >> >>> > > > > > > > > >>Can we instead do the opposite exercise and start >>>with >>> the >>> > > > basics >>> > > > > > of >>> > > > > > > > what >>> > > > > > > > > >>mm should do and think about what deficiencies >>>prevents >>> > this >>> > > > > > approach >>> > > > > > > > > >>from >>> > > > > > > > > >>working? Then let's make sure the currently in-flight >>> work >>> > will >>> > > > > > remove >>> > > > > > > > > >>these deficiencies. After all mm is kind of the >>> > prototypical >>> > > > kafka >>> > > > > > use >>> > > > > > > > > >>case >>> > > > > > > > > >>so if we can't make our clients to this probably no >>>one >>> > else >>> > > > can. >>> > > > > > > > > >> >>> > > > > > > > > >>I think mm should just be N independent threads each >>>of >>> > which >>> > > > has >>> > > > > > their >>> > > > > > > > > >>own >>> > > > > > > > > >>consumer but share a producer and each of which looks >>> like >>> > > > this: >>> > > > > > > > > >> >>> > > > > > > > > >>while(true) { >>> > > > > > > > > >> val recs = consumer.poll(Long.MaxValue); >>> > > > > > > > > >> for (rec <- recs) >>> > > > > > > > > >> producer.send(rec, logErrorCallback) >>> > > > > > > > > >> if(System.currentTimeMillis - lastCommit > >>> > commitInterval) >>> > > > { >>> > > > > > > > > >> producer.flush() >>> > > > > > > > > >> consumer.commit() >>> > > > > > > > > >> lastCommit = System.currentTimeMillis >>> > > > > > > > > >> } >>> > > > > > > > > >>} >>> > > > > > > > > >> >>> > > > > > > > > >>This will depend on setting the retry count in the >>> > producer to >>> > > > > > > > something >>> > > > > > > > > >>high with a largish backoff so that a failed send >>>attempt >>> > > > doesn't >>> > > > > > drop >>> > > > > > > > > >>data. >>> > > > > > > > > >> >>> > > > > > > > > >>We will need to use the callback to force a flush and >>> > offset >>> > > > > > commit on >>> > > > > > > > > >>rebalance. >>> > > > > > > > > >> >>> > > > > > > > > >>This approach may have a few more TCP connections due >>>to >>> > using >>> > > > > > multiple >>> > > > > > > > > >>consumers but I think it is a lot easier to reason >>>about >>> > and >>> > > > the >>> > > > > > total >>> > > > > > > > > >>number of mm instances is always going to be small. >>> > > > > > > > > >> >>> > > > > > > > > >>Let's talk about where this simple approach falls >>>short, >>> I >>> > > > think >>> > > > > > that >>> > > > > > > > > >>will >>> > > > > > > > > >>help us understand your motivations for additional >>> > elements. >>> > > > > > > > > >> >>> > > > > > > > > >>Another advantage of this is that it is so simple I >>>don't >>> > > > think we >>> > > > > > > > really >>> > > > > > > > > >>even need to both making mm extensible because >>>writing >>> > your own >>> > > > > > code >>> > > > > > > > that >>> > > > > > > > > >>does custom processing or transformation is just ten >>> lines >>> > and >>> > > > no >>> > > > > > plug >>> > > > > > > > in >>> > > > > > > > > >>system is going to make it simpler. >>> > > > > > > > > >> >>> > > > > > > > > >>-Jay >>> > > > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > > > >>On Sun, Feb 8, 2015 at 2:40 PM, Neha Narkhede < >>> > > > n...@confluent.io> >>> > > > > > > > wrote: >>> > > > > > > > > >> >>> > > > > > > > > >>> Few comments - >>> > > > > > > > > >>> >>> > > > > > > > > >>> 1. Why do we need the message handler? Do you have >>> > concrete >>> > > > use >>> > > > > > cases >>> > > > > > > > > >>>in >>> > > > > > > > > >>> mind? If not, we should consider adding it in the >>> future >>> > > > when/if >>> > > > > > we >>> > > > > > > > do >>> > > > > > > > > >>>have >>> > > > > > > > > >>> use cases for it. The purpose of the mirror maker >>>is a >>> > simple >>> > > > > > tool >>> > > > > > > > for >>> > > > > > > > > >>> setting up Kafka cluster replicas. I don't see why >>>we >>> > need to >>> > > > > > > > include a >>> > > > > > > > > >>> message handler for doing stream transformations or >>> > > > filtering. >>> > > > > > You >>> > > > > > > > can >>> > > > > > > > > >>> always write a simple process for doing that once >>>the >>> > data is >>> > > > > > copied >>> > > > > > > > as >>> > > > > > > > > >>>is >>> > > > > > > > > >>> in the target cluster >>> > > > > > > > > >>> 2. Why keep both designs? We should prefer the >>>simpler >>> > design >>> > > > > > unless >>> > > > > > > > it >>> > > > > > > > > >>>is >>> > > > > > > > > >>> not feasible due to the performance issue that we >>> > previously >>> > > > > > had. Did >>> > > > > > > > > >>>you >>> > > > > > > > > >>> get a chance to run some tests to see if that is >>>really >>> > > > still a >>> > > > > > > > problem >>> > > > > > > > > >>>or >>> > > > > > > > > >>> not? It will be easier to think about the design >>>and >>> also >>> > > > make >>> > > > > > the >>> > > > > > > > KIP >>> > > > > > > > > >>> complete if we make a call on the design first. >>> > > > > > > > > >>> 3. Can you explain the need for keeping a list of >>> unacked >>> > > > > > offsets per >>> > > > > > > > > >>> partition? Consider adding a section on retries and >>>how >>> > you >>> > > > plan >>> > > > > > to >>> > > > > > > > > >>>handle >>> > > > > > > > > >>> the case when the producer runs out of all retries. >>> > > > > > > > > >>> >>> > > > > > > > > >>> Thanks, >>> > > > > > > > > >>> Neha >>> > > > > > > > > >>> >>> > > > > > > > > >>> On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin >>> > > > > > > > > >>><j...@linkedin.com.invalid> >>> > > > > > > > > >>> wrote: >>> > > > > > > > > >>> >>> > > > > > > > > >>> > Hi Neha, >>> > > > > > > > > >>> > >>> > > > > > > > > >>> > Yes, I’ve updated the KIP so the entire KIP is >>>based >>> > on new >>> > > > > > > > consumer >>> > > > > > > > > >>>now. >>> > > > > > > > > >>> > I’ve put both designs with and without data >>>channel >>> in >>> > the >>> > > > KIP >>> > > > > > as I >>> > > > > > > > > >>>still >>> > > > > > > > > >>> > feel we might need the data channel to provide >>>more >>> > > > > > flexibility, >>> > > > > > > > > >>> > especially after message handler is introduced. >>>I’ve >>> > put my >>> > > > > > > > thinking >>> > > > > > > > > >>>of >>> > > > > > > > > >>> > the pros and cons of the two designs in the KIP >>>as >>> > well. >>> > > > It’ll >>> > > > > > be >>> > > > > > > > > >>>great >>> > > > > > > > > >>> if >>> > > > > > > > > >>> > you can give a review and comment. >>> > > > > > > > > >>> > >>> > > > > > > > > >>> > Thanks. >>> > > > > > > > > >>> > >>> > > > > > > > > >>> > Jiangjie (Becket) Qin >>> > > > > > > > > >>> > >>> > > > > > > > > >>> > On 2/6/15, 7:30 PM, "Neha Narkhede" < >>> n...@confluent.io >>> > > >>> > > > wrote: >>> > > > > > > > > >>> > >>> > > > > > > > > >>> > >Hey Becket, >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > >What are the next steps on this KIP. As per your >>> > comment >>> > > > > > earlier >>> > > > > > > > on >>> > > > > > > > > >>>the >>> > > > > > > > > >>> > >thread - >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > >I do agree it makes more sense >>> > > > > > > > > >>> > >> to avoid duplicate effort and plan based on >>>new >>> > > > consumer. >>> > > > > > I’ll >>> > > > > > > > > >>>modify >>> > > > > > > > > >>> > >>the >>> > > > > > > > > >>> > >> KIP. >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > >Did you get a chance to think about the >>>simplified >>> > design >>> > > > > > that we >>> > > > > > > > > >>> proposed >>> > > > > > > > > >>> > >earlier? Do you plan to update the KIP with that >>> > proposal? >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > >Thanks, >>> > > > > > > > > >>> > >Neha >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin >>> > > > > > > > > >>><j...@linkedin.com.invalid >>> > > > > > > > > >>> > >>> > > > > > > > > >>> > >wrote: >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > >> In mirror maker we do not do de-serialization >>>on >>> the >>> > > > > > messages. >>> > > > > > > > > >>>Mirror >>> > > > > > > > > >>> > >> maker use source TopicPartition hash to chose >>>a >>> > > > producer to >>> > > > > > send >>> > > > > > > > > >>> > >>messages >>> > > > > > > > > >>> > >> from the same source partition. The partition >>> those >>> > > > > > messages end >>> > > > > > > > > >>>up >>> > > > > > > > > >>> with >>> > > > > > > > > >>> > >> are decided by Partitioner class in >>>KafkaProducer >>> > > > (assuming >>> > > > > > you >>> > > > > > > > > >>>are >>> > > > > > > > > >>> > >>using >>> > > > > > > > > >>> > >> the new producer), which uses hash code of >>> bytes[]. >>> > > > > > > > > >>> > >> >>> > > > > > > > > >>> > >> If deserialization is needed, it has to be >>>done in >>> > > > message >>> > > > > > > > > >>>handler. >>> > > > > > > > > >>> > >> >>> > > > > > > > > >>> > >> Thanks. >>> > > > > > > > > >>> > >> >>> > > > > > > > > >>> > >> Jiangjie (Becket) Qin >>> > > > > > > > > >>> > >> >>> > > > > > > > > >>> > >> On 2/4/15, 11:33 AM, "Bhavesh Mistry" < >>> > > > > > > > mistry.p.bhav...@gmail.com> >>> > > > > > > > > >>> > >>wrote: >>> > > > > > > > > >>> > >> >>> > > > > > > > > >>> > >> >Hi Jiangjie, >>> > > > > > > > > >>> > >> > >>> > > > > > > > > >>> > >> >Thanks for entertaining my question so far. >>>Last >>> > > > > > question, I >>> > > > > > > > > >>>have is >>> > > > > > > > > >>> > >> >about >>> > > > > > > > > >>> > >> >serialization of message key. If the key >>> > > > de-serialization >>> > > > > > > > > >>>(Class) is >>> > > > > > > > > >>> > >>not >>> > > > > > > > > >>> > >> >present at the MM instance, then does it use >>>raw >>> > byte >>> > > > > > hashcode >>> > > > > > > > to >>> > > > > > > > > >>> > >> >determine >>> > > > > > > > > >>> > >> >the partition ? How are you going to address >>>the >>> > > > situation >>> > > > > > > > where >>> > > > > > > > > >>>key >>> > > > > > > > > >>> > >> >needs >>> > > > > > > > > >>> > >> >to be de-serialization and get actual >>>hashcode >>> > needs >>> > > > to be >>> > > > > > > > > >>>computed >>> > > > > > > > > >>> ?. >>> > > > > > > > > >>> > >> > >>> > > > > > > > > >>> > >> > >>> > > > > > > > > >>> > >> >Thanks, >>> > > > > > > > > >>> > >> > >>> > > > > > > > > >>> > >> >Bhavesh >>> > > > > > > > > >>> > >> > >>> > > > > > > > > >>> > >> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin >>> > > > > > > > > >>> > >><j...@linkedin.com.invalid> >>> > > > > > > > > >>> > >> >wrote: >>> > > > > > > > > >>> > >> > >>> > > > > > > > > >>> > >> >> Hi Bhavesh, >>> > > > > > > > > >>> > >> >> >>> > > > > > > > > >>> > >> >> Please see inline comments. >>> > > > > > > > > >>> > >> >> >>> > > > > > > > > >>> > >> >> Jiangjie (Becket) Qin >>> > > > > > > > > >>> > >> >> >>> > > > > > > > > >>> > >> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" >>> > > > > > > > > >>><mistry.p.bhav...@gmail.com> >>> > > > > > > > > >>> > >> >>wrote: >>> > > > > > > > > >>> > >> >> >>> > > > > > > > > >>> > >> >> >Hi Jiangjie, >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> >Thanks for the input. >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> >a) Is MM will producer ack will be attach >>>to >>> > > > Producer >>> > > > > > > > > >>>Instance or >>> > > > > > > > > >>> > >>per >>> > > > > > > > > >>> > >> >> >topic. Use case is that one instance of >>>MM >>> > > > > > > > > >>> > >> >> >needs to handle both strong ack and also >>>ack=0 >>> > for >>> > > > some >>> > > > > > > > topic. >>> > > > > > > > > >>> Or >>> > > > > > > > > >>> > >>it >>> > > > > > > > > >>> > >> >> >would >>> > > > > > > > > >>> > >> >> >be better to set-up another instance of >>>MM. >>> > > > > > > > > >>> > >> >> The acks setting is producer level setting >>> > instead of >>> > > > > > topic >>> > > > > > > > > >>>level >>> > > > > > > > > >>> > >> >>setting. >>> > > > > > > > > >>> > >> >> In this case you probably need to set up >>> another >>> > > > > > instance. >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> >b) Regarding TCP connections, Why does >>> #producer >>> > > > > > instance >>> > > > > > > > > >>>attach >>> > > > > > > > > >>> to >>> > > > > > > > > >>> > >>TCP >>> > > > > > > > > >>> > >> >> >connection. Is it possible to use Broker >>> > > > Connection TCP >>> > > > > > > > Pool, >>> > > > > > > > > >>> > >>producer >>> > > > > > > > > >>> > >> >> >will just checkout TCP connection to >>>Broker. >>> > So, >>> > > > # of >>> > > > > > > > > >>>Producer >>> > > > > > > > > >>> > >> >>Instance >>> > > > > > > > > >>> > >> >> >does not correlation to Brokers >>>Connection. >>> Is >>> > this >>> > > > > > > > possible >>> > > > > > > > > >>>? >>> > > > > > > > > >>> > >> >> In new producer, each producer maintains a >>> > > > connection to >>> > > > > > each >>> > > > > > > > > >>> broker >>> > > > > > > > > >>> > >> >> within the producer instance. Making >>>producer >>> > > > instances >>> > > > > > to >>> > > > > > > > > >>>share >>> > > > > > > > > >>> the >>> > > > > > > > > >>> > >>TCP >>> > > > > > > > > >>> > >> >> connections is a very big change to the >>>current >>> > > > design, >>> > > > > > so I >>> > > > > > > > > >>> suppose >>> > > > > > > > > >>> > >>we >>> > > > > > > > > >>> > >> >> won’t be able to do that. >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> >Thanks, >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> >Bhavesh >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie >>>Qin >>> > > > > > > > > >>> > >> >><j...@linkedin.com.invalid >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> >wrote: >>> > > > > > > > > >>> > >> >> > >>> > > > > > > > > >>> > >> >> >> Hi Bhavesh, >>> > > > > > > > > >>> > >> >> >> >>> > > > > > > > > >>> > >> >> >> I think it is the right discussion to >>>have >>> > when >>> > > > we are >>> > > > > > > > > >>>talking >>> > > > > > > > > >>> > >>about >>> > > > > > > > > >>> > >> >>the >>> > > > > > > > > >>> > >> >> >> new new design for MM. >>> > > > > > > > > >>> > >> >> >> Please see the inline comments. >>> > > > > > > > > >>> > >> >> >> >>> > > > > > > > > >>> > >> >> >> Jiangjie (Becket) Qin >>> > > > > > > > > >>> > >> >> >> >>> > > > > > > > > >>> > >> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" >>> > > > > > > > > >>> > >><mistry.p.bhav...@gmail.com> >>> > > > > > > > > >>> > >> >> >>wrote: >>> > > > > > > > > >>> > >> >> >> >>> > > > > > > > > >>> > >> >> >> >Hi Jiangjie, >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >I just wanted to let you know about our >>>use >>> > case >>> > > > and >>> > > > > > > > stress >>> > > > > > > > > >>>the >>> > > > > > > > > >>> > >> >>point >>> > > > > > > > > >>> > >> >> >>that >>> > > > > > > > > >>> > >> >> >> >local data center broker cluster have >>>fewer >>> > > > > > partitions >>> > > > > > > > than >>> > > > > > > > > >>>the >>> > > > > > > > > >>> > >> >> >> >destination >>> > > > > > > > > >>> > >> >> >> >offline broker cluster. Just because we >>>do >>> > the >>> > > > batch >>> > > > > > pull >>> > > > > > > > > >>>from >>> > > > > > > > > >>> > >>CAMUS >>> > > > > > > > > >>> > >> >> >>and >>> > > > > > > > > >>> > >> >> >> >in >>> > > > > > > > > >>> > >> >> >> >order to drain data faster than the >>> injection >>> > > > rate >>> > > > > > (from >>> > > > > > > > > >>>four >>> > > > > > > > > >>> DCs >>> > > > > > > > > >>> > >> >>for >>> > > > > > > > > >>> > >> >> >>same >>> > > > > > > > > >>> > >> >> >> >topic). >>> > > > > > > > > >>> > >> >> >> Keeping the same partition number in >>>source >>> > and >>> > > > target >>> > > > > > > > > >>>cluster >>> > > > > > > > > >>> > >>will >>> > > > > > > > > >>> > >> >>be >>> > > > > > > > > >>> > >> >> >>an >>> > > > > > > > > >>> > >> >> >> option but will not be enforced by >>>default. >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >We are facing following issues >>>(probably >>> due >>> > to >>> > > > > > > > > >>>configuration): >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >1) We occasionally loose data due >>>to >>> > message >>> > > > > > batch >>> > > > > > > > > >>>size is >>> > > > > > > > > >>> > >>too >>> > > > > > > > > >>> > >> >> >>large >>> > > > > > > > > >>> > >> >> >> >(2MB) on target data (we are using old >>> > producer >>> > > > but I >>> > > > > > > > think >>> > > > > > > > > >>>new >>> > > > > > > > > >>> > >> >> >>producer >>> > > > > > > > > >>> > >> >> >> >will solve this problem to some >>>extend). >>> > > > > > > > > >>> > >> >> >> We do see this issue in LinkedIn as >>>well. >>> New >>> > > > producer >>> > > > > > > > also >>> > > > > > > > > >>> might >>> > > > > > > > > >>> > >> >>have >>> > > > > > > > > >>> > >> >> >> this issue. There are some proposal of >>> > solutions, >>> > > > but >>> > > > > > no >>> > > > > > > > > >>>real >>> > > > > > > > > >>> work >>> > > > > > > > > >>> > >> >> >>started >>> > > > > > > > > >>> > >> >> >> yet. For now, as a workaround, setting a >>> more >>> > > > > > aggressive >>> > > > > > > > > >>>batch >>> > > > > > > > > >>> > >>size >>> > > > > > > > > >>> > >> >>on >>> > > > > > > > > >>> > >> >> >> producer side should work. >>> > > > > > > > > >>> > >> >> >> >2) Since only one instance is set >>>to >>> MM >>> > > > data, >>> > > > > > we >>> > > > > > > > are >>> > > > > > > > > >>>not >>> > > > > > > > > >>> > >>able >>> > > > > > > > > >>> > >> >>to >>> > > > > > > > > >>> > >> >> >> >set-up ack per topic instead ack is >>> attached >>> > to >>> > > > > > producer >>> > > > > > > > > >>> > >>instance. >>> > > > > > > > > >>> > >> >> >> I don’t quite get the question here. >>> > > > > > > > > >>> > >> >> >> >3) How are you going to address >>>two >>> > phase >>> > > > commit >>> > > > > > > > > >>>problem >>> > > > > > > > > >>> if >>> > > > > > > > > >>> > >> >>ack is >>> > > > > > > > > >>> > >> >> >> >set >>> > > > > > > > > >>> > >> >> >> >to strongest, but auto commit is on for >>> > consumer >>> > > > > > (meaning >>> > > > > > > > > >>> > >>producer >>> > > > > > > > > >>> > >> >>does >>> > > > > > > > > >>> > >> >> >> >not >>> > > > > > > > > >>> > >> >> >> >get ack, but consumer auto committed >>> offset >>> > that >>> > > > > > > > message). >>> > > > > > > > > >>> Is >>> > > > > > > > > >>> > >> >>there >>> > > > > > > > > >>> > >> >> >> >transactional (Kafka transaction is in >>> > process) >>> > > > > > based ack >>> > > > > > > > > >>>and >>> > > > > > > > > >>> > >>commit >>> > > > > > > > > >>> > >> >> >> >offset >>> > > > > > > > > >>> > >> >> >> >? >>> > > > > > > > > >>> > >> >> >> Auto offset commit should be turned off >>>in >>> > this >>> > > > case. >>> > > > > > The >>> > > > > > > > > >>>offset >>> > > > > > > > > >>> > >>will >>> > > > > > > > > >>> > >> >> >>only >>> > > > > > > > > >>> > >> >> >> be committed once by the offset commit >>> > thread. So >>> > > > > > there is >>> > > > > > > > > >>>no >>> > > > > > > > > >>> two >>> > > > > > > > > >>> > >> >>phase >>> > > > > > > > > >>> > >> >> >> commit. >>> > > > > > > > > >>> > >> >> >> >4) How are you planning to avoid >>> > duplicated >>> > > > > > message? >>> > > > > > > > > >>>( Is >>> > > > > > > > > >>> > >> >> >> >brokergoing >>> > > > > > > > > >>> > >> >> >> >have moving window of message >>>collected >>>and >>> > > > de-dupe >>> > > > > > ?) >>> > > > > > > > > >>> > >>Possibly, we >>> > > > > > > > > >>> > >> >> >>get >>> > > > > > > > > >>> > >> >> >> >this from retry set to 5…? >>> > > > > > > > > >>> > >> >> >> We are not trying to completely avoid >>> > duplicates. >>> > > > The >>> > > > > > > > > >>>duplicates >>> > > > > > > > > >>> > >>will >>> > > > > > > > > >>> > >> >> >> still be there if: >>> > > > > > > > > >>> > >> >> >> 1. Producer retries on failure. >>> > > > > > > > > >>> > >> >> >> 2. Mirror maker is hard killed. >>> > > > > > > > > >>> > >> >> >> Currently, dedup is expected to be done >>>by >>> > user if >>> > > > > > > > > >>>necessary. >>> > > > > > > > > >>> > >> >> >> >5) Last, is there any warning or >>>any >>> > thing >>> > > > you >>> > > > > > can >>> > > > > > > > > >>>provide >>> > > > > > > > > >>> > >> >>insight >>> > > > > > > > > >>> > >> >> >> >from MM component about data injection >>>rate >>> > into >>> > > > > > > > > >>>destination >>> > > > > > > > > >>> > >> >> >>partitions is >>> > > > > > > > > >>> > >> >> >> >NOT evenly distributed regardless of >>> keyed >>> > or >>> > > > > > non-keyed >>> > > > > > > > > >>> message >>> > > > > > > > > >>> > >> >> >>(Hence >>> > > > > > > > > >>> > >> >> >> >there is ripple effect such as data not >>> > arriving >>> > > > > > late, or >>> > > > > > > > > >>>data >>> > > > > > > > > >>> is >>> > > > > > > > > >>> > >> >> >>arriving >>> > > > > > > > > >>> > >> >> >> >out of order in intern of time stamp >>>and >>> > early >>> > > > some >>> > > > > > > > time, >>> > > > > > > > > >>>and >>> > > > > > > > > >>> > >> >>CAMUS >>> > > > > > > > > >>> > >> >> >> >creates huge number of file count on >>>HDFS >>> > due to >>> > > > > > uneven >>> > > > > > > > > >>> injection >>> > > > > > > > > >>> > >> >>rate >>> > > > > > > > > >>> > >> >> >>. >>> > > > > > > > > >>> > >> >> >> >Camus Job is configured to run every 3 >>> > minutes.) >>> > > > > > > > > >>> > >> >> >> I think uneven data distribution is >>> typically >>> > > > caused >>> > > > > > by >>> > > > > > > > > >>>server >>> > > > > > > > > >>> > >>side >>> > > > > > > > > >>> > >> >> >> unbalance, instead of something mirror >>>maker >>> > could >>> > > > > > > > control. >>> > > > > > > > > >>>In >>> > > > > > > > > >>> new >>> > > > > > > > > >>> > >> >> >>mirror >>> > > > > > > > > >>> > >> >> >> maker, however, there is a customizable >>> > message >>> > > > > > handler, >>> > > > > > > > > >>>that >>> > > > > > > > > >>> > >>might >>> > > > > > > > > >>> > >> >>be >>> > > > > > > > > >>> > >> >> >> able to help a little bit. In message >>> handler, >>> > > > you can >>> > > > > > > > > >>> explicitly >>> > > > > > > > > >>> > >> >>set a >>> > > > > > > > > >>> > >> >> >> partition that you want to produce the >>> message >>> > > > to. So >>> > > > > > if >>> > > > > > > > you >>> > > > > > > > > >>> know >>> > > > > > > > > >>> > >>the >>> > > > > > > > > >>> > >> >> >> uneven data distribution in target >>>cluster, >>> > you >>> > > > may >>> > > > > > offset >>> > > > > > > > > >>>it >>> > > > > > > > > >>> > >>here. >>> > > > > > > > > >>> > >> >>But >>> > > > > > > > > >>> > >> >> >> that probably only works for non-keyed >>> > messages. >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >I am not sure if this is right >>>discussion >>> > form to >>> > > > > > bring >>> > > > > > > > > >>>these >>> > > > > > > > > >>> to >>> > > > > > > > > >>> > >> >> >> >your/kafka >>> > > > > > > > > >>> > >> >> >> >Dev team attention. This might be off >>> track, >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >Thanks, >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >Bhavesh >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >On Wed, Jan 28, 2015 at 11:07 AM, >>>Jiangjie >>> > Qin >>> > > > > > > > > >>> > >> >> >><j...@linkedin.com.invalid >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >wrote: >>> > > > > > > > > >>> > >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> I’ve updated the KIP page. Feedbacks >>>are >>> > > > welcome. >>> > > > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> Regarding the simple mirror maker >>> design. I >>> > > > thought >>> > > > > > > > over >>> > > > > > > > > >>>it >>> > > > > > > > > >>> and >>> > > > > > > > > >>> > >> >>have >>> > > > > > > > > >>> > >> >> >> >>some >>> > > > > > > > > >>> > >> >> >> >> worries: >>> > > > > > > > > >>> > >> >> >> >> There are two things that might worth >>> > thinking: >>> > > > > > > > > >>> > >> >> >> >> 1. One of the enhancement to mirror >>>maker >>> > is >>> > > > > > adding a >>> > > > > > > > > >>>message >>> > > > > > > > > >>> > >> >> >>handler to >>> > > > > > > > > >>> > >> >> >> >> do things like reformatting. I think >>>we >>> > might >>> > > > > > > > potentially >>> > > > > > > > > >>> want >>> > > > > > > > > >>> > >>to >>> > > > > > > > > >>> > >> >> >>have >>> > > > > > > > > >>> > >> >> >> >> more threads processing the messages >>>than >>> > the >>> > > > > > number of >>> > > > > > > > > >>> > >>consumers. >>> > > > > > > > > >>> > >> >> >>If we >>> > > > > > > > > >>> > >> >> >> >> follow the simple mirror maker >>>solution, >>> we >>> > > > lose >>> > > > > > this >>> > > > > > > > > >>> > >>flexibility. >>> > > > > > > > > >>> > >> >> >> >> 2. This might not matter too much, >>>but >>> > creating >>> > > > > > more >>> > > > > > > > > >>> consumers >>> > > > > > > > > >>> > >> >>means >>> > > > > > > > > >>> > >> >> >> >>more >>> > > > > > > > > >>> > >> >> >> >> footprint of TCP connection / memory. >>> > > > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> Any thoughts on this? >>> > > > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> Thanks. >>> > > > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> Jiangjie (Becket) Qin >>> > > > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" >>>< >>> > > > > > > > j...@linkedin.com> >>> > > > > > > > > >>> > wrote: >>> > > > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >Hi Jay and Neha, >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> >Thanks a lot for the reply and >>> > explanation. I >>> > > > do >>> > > > > > agree >>> > > > > > > > > >>>it >>> > > > > > > > > >>> > >>makes >>> > > > > > > > > >>> > >> >>more >>> > > > > > > > > >>> > >> >> >> >>sense >>> > > > > > > > > >>> > >> >> >> >> >to avoid duplicate effort and plan >>>based >>> > on >>> > > > new >>> > > > > > > > > >>>consumer. >>> > > > > > > > > >>> I’ll >>> > > > > > > > > >>> > >> >> >>modify >>> > > > > > > > > >>> > >> >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >KIP. >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> >To Jay’s question on message >>>ordering - >>> > The >>> > > > data >>> > > > > > > > channel >>> > > > > > > > > >>> > >> >>selection >>> > > > > > > > > >>> > >> >> >> >>makes >>> > > > > > > > > >>> > >> >> >> >> >sure that the messages from the same >>> > source >>> > > > > > partition >>> > > > > > > > > >>>will >>> > > > > > > > > >>> > >>sent >>> > > > > > > > > >>> > >> >>by >>> > > > > > > > > >>> > >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >same producer. So the order of the >>> > messages is >>> > > > > > > > > >>>guaranteed >>> > > > > > > > > >>> with >>> > > > > > > > > >>> > >> >> >>proper >>> > > > > > > > > >>> > >> >> >> >> >producer settings >>> > > > > > > > > >>> > >> >>>>>(MaxInFlightRequests=1,retries=Integer.MaxValue, >>> > > > > > > > > >>> > >> >> >> >>etc.) >>> > > > > > > > > >>> > >> >> >> >> >For keyed messages, because they >>>come >>> > from the >>> > > > > > same >>> > > > > > > > > >>>source >>> > > > > > > > > >>> > >> >>partition >>> > > > > > > > > >>> > >> >> >> >>and >>> > > > > > > > > >>> > >> >> >> >> >will end up in the same target >>> partition, >>> > as >>> > > > long >>> > > > > > as >>> > > > > > > > > >>>they >>> > > > > > > > > >>> are >>> > > > > > > > > >>> > >> >>sent >>> > > > > > > > > >>> > >> >> >>by >>> > > > > > > > > >>> > >> >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >same producer, the order is >>>guaranteed. >>> > > > > > > > > >>> > >> >> >> >> >For non-keyed messages, the messages >>> > coming >>> > > > from >>> > > > > > the >>> > > > > > > > > >>>same >>> > > > > > > > > >>> > >>source >>> > > > > > > > > >>> > >> >> >> >>partition >>> > > > > > > > > >>> > >> >> >> >> >might go to different target >>>partitions. >>> > The >>> > > > > > order is >>> > > > > > > > > >>>only >>> > > > > > > > > >>> > >> >> >>guaranteed >>> > > > > > > > > >>> > >> >> >> >> >within each partition. >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> >Anyway, I’ll modify the KIP and data >>> > channel >>> > > > will >>> > > > > > be >>> > > > > > > > > >>>away. >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> >Thanks. >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> >Jiangjie (Becket) Qin >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> >On 1/25/15, 4:34 PM, "Neha >>>Narkhede" >>>< >>> > > > > > > > n...@confluent.io> >>> > > > > > > > > >>> > >>wrote: >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> >>I think there is some value in >>> > investigating >>> > > > if >>> > > > > > we >>> > > > > > > > can >>> > > > > > > > > >>>go >>> > > > > > > > > >>> > >>back >>> > > > > > > > > >>> > >> >>to >>> > > > > > > > > >>> > >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >>simple mirror maker design, as Jay >>> points >>> > > > out. >>> > > > > > Here >>> > > > > > > > you >>> > > > > > > > > >>> have >>> > > > > > > > > >>> > >>N >>> > > > > > > > > >>> > >> >> >> >>threads, >>> > > > > > > > > >>> > >> >> >> >> >>each has a consumer and a producer. >>> > > > > > > > > >>> > >> >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>The reason why we had to move away >>>from >>> > that >>> > > > was >>> > > > > > a >>> > > > > > > > > >>> > >>combination >>> > > > > > > > > >>> > >> >>of >>> > > > > > > > > >>> > >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >>difference in throughput between >>>the >>> > consumer >>> > > > > > and the >>> > > > > > > > > >>>old >>> > > > > > > > > >>> > >> >>producer >>> > > > > > > > > >>> > >> >> >>and >>> > > > > > > > > >>> > >> >> >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >>deficiency of the consumer >>>rebalancing >>> > that >>> > > > > > limits >>> > > > > > > > the >>> > > > > > > > > >>> total >>> > > > > > > > > >>> > >> >> >>number of >>> > > > > > > > > >>> > >> >> >> >> >>mirror maker threads. So the only >>> option >>> > > > > > available >>> > > > > > > > was >>> > > > > > > > > >>>to >>> > > > > > > > > >>> > >> >>increase >>> > > > > > > > > >>> > >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >>throughput of the limited # of >>>mirror >>> > maker >>> > > > > > threads >>> > > > > > > > > >>>that >>> > > > > > > > > >>> > >>could >>> > > > > > > > > >>> > >> >>be >>> > > > > > > > > >>> > >> >> >> >> >>deployed. >>> > > > > > > > > >>> > >> >> >> >> >>Now that queuing design may not >>>make >>> > sense, >>> > > > if >>> > > > > > the >>> > > > > > > > new >>> > > > > > > > > >>> > >> >>producer's >>> > > > > > > > > >>> > >> >> >> >> >>throughput is almost similar to the >>> > consumer >>> > > > AND >>> > > > > > the >>> > > > > > > > > >>>fact >>> > > > > > > > > >>> > >>that >>> > > > > > > > > >>> > >> >>the >>> > > > > > > > > >>> > >> >> >>new >>> > > > > > > > > >>> > >> >> >> >> >>round-robin based consumer >>>rebalancing >>> > can >>> > > > allow >>> > > > > > a >>> > > > > > > > very >>> > > > > > > > > >>> high >>> > > > > > > > > >>> > >> >> >>number of >>> > > > > > > > > >>> > >> >> >> >> >>mirror maker instances to exist. >>> > > > > > > > > >>> > >> >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>This is the end state that the >>>mirror >>> > maker >>> > > > > > should be >>> > > > > > > > > >>>in >>> > > > > > > > > >>> once >>> > > > > > > > > >>> > >> >>the >>> > > > > > > > > >>> > >> >> >>new >>> > > > > > > > > >>> > >> >> >> >> >>consumer is complete, so it >>>wouldn't >>> > hurt to >>> > > > see >>> > > > > > if >>> > > > > > > > we >>> > > > > > > > > >>>can >>> > > > > > > > > >>> > >>just >>> > > > > > > > > >>> > >> >> >>move >>> > > > > > > > > >>> > >> >> >> >>to >>> > > > > > > > > >>> > >> >> >> >> >>that right now. >>> > > > > > > > > >>> > >> >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, >>>Jay >>> > Kreps >>> > > > > > > > > >>> > >><jay.kr...@gmail.com >>> > > > > > > > > >>> > >> > >>> > > > > > > > > >>> > >> >> >> >>wrote: >>> > > > > > > > > >>> > >> >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> QQ: If we ever use a different >>> > technique >>> > > > for >>> > > > > > the >>> > > > > > > > data >>> > > > > > > > > >>> > >>channel >>> > > > > > > > > >>> > >> >> >> >>selection >>> > > > > > > > > >>> > >> >> >> >> >>> than for the producer >>>partitioning >>> > won't >>> > > > that >>> > > > > > break >>> > > > > > > > > >>> > >>ordering? >>> > > > > > > > > >>> > >> >>How >>> > > > > > > > > >>> > >> >> >> >>can >>> > > > > > > > > >>> > >> >> >> >> >>>we >>> > > > > > > > > >>> > >> >> >> >> >>> ensure these things stay in sync? >>> > > > > > > > > >>> > >> >> >> >> >>> >>> > > > > > > > > >>> > >> >> >> >> >>> With respect to the new >>>consumer--I >>> > really >>> > > > do >>> > > > > > want >>> > > > > > > > to >>> > > > > > > > > >>> > >> >>encourage >>> > > > > > > > > >>> > >> >> >> >>people >>> > > > > > > > > >>> > >> >> >> >> >>>to >>> > > > > > > > > >>> > >> >> >> >> >>> think through how MM will work >>>with >>> > the new >>> > > > > > > > consumer. >>> > > > > > > > > >>>I >>> > > > > > > > > >>> > >>mean >>> > > > > > > > > >>> > >> >>this >>> > > > > > > > > >>> > >> >> >> >>isn't >>> > > > > > > > > >>> > >> >> >> >> >>> very far off, maybe a few months >>>if >>> we >>> > > > hustle? >>> > > > > > I >>> > > > > > > > > >>>could >>> > > > > > > > > >>> > >> >>imagine us >>> > > > > > > > > >>> > >> >> >> >> >>>getting >>> > > > > > > > > >>> > >> >> >> >> >>> this mm fix done maybe sooner, >>>maybe >>> > in a >>> > > > > > month? >>> > > > > > > > So I >>> > > > > > > > > >>> guess >>> > > > > > > > > >>> > >> >>this >>> > > > > > > > > >>> > >> >> >> >>buys >>> > > > > > > > > >>> > >> >> >> >> >>>us an >>> > > > > > > > > >>> > >> >> >> >> >>> extra month before we rip it out >>>and >>> > throw >>> > > > it >>> > > > > > away? >>> > > > > > > > > >>>Maybe >>> > > > > > > > > >>> > >>two? >>> > > > > > > > > >>> > >> >> >>This >>> > > > > > > > > >>> > >> >> >> >>bug >>> > > > > > > > > >>> > >> >> >> >> >>>has >>> > > > > > > > > >>> > >> >> >> >> >>> been there for a while, though, >>> right? >>> > Is >>> > > > it >>> > > > > > worth >>> > > > > > > > > >>>it? >>> > > > > > > > > >>> > >> >>Probably >>> > > > > > > > > >>> > >> >> >>it >>> > > > > > > > > >>> > >> >> >> >>is, >>> > > > > > > > > >>> > >> >> >> >> >>>but >>> > > > > > > > > >>> > >> >> >> >> >>> it still kind of sucks to have >>>the >>> > > > duplicate >>> > > > > > > > effort. >>> > > > > > > > > >>> > >> >> >> >> >>> >>> > > > > > > > > >>> > >> >> >> >> >>> So anyhow let's definitely think >>> about >>> > how >>> > > > > > things >>> > > > > > > > > >>>will >>> > > > > > > > > >>> work >>> > > > > > > > > >>> > >> >>with >>> > > > > > > > > >>> > >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >>>new >>> > > > > > > > > >>> > >> >> >> >> >>> consumer. I think we can probably >>> just >>> > > > have N >>> > > > > > > > > >>>threads, >>> > > > > > > > > >>> each >>> > > > > > > > > >>> > >> >> >>thread >>> > > > > > > > > >>> > >> >> >> >>has >>> > > > > > > > > >>> > >> >> >> >> >>>a >>> > > > > > > > > >>> > >> >> >> >> >>> producer and consumer and is >>> internally >>> > > > single >>> > > > > > > > > >>>threaded. >>> > > > > > > > > >>> > >>Any >>> > > > > > > > > >>> > >> >> >>reason >>> > > > > > > > > >>> > >> >> >> >> >>>this >>> > > > > > > > > >>> > >> >> >> >> >>> wouldn't work? >>> > > > > > > > > >>> > >> >> >> >> >>> >>> > > > > > > > > >>> > >> >> >> >> >>> -Jay >>> > > > > > > > > >>> > >> >> >> >> >>> >>> > > > > > > > > >>> > >> >> >> >> >>> >>> > > > > > > > > >>> > >> >> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, >>> > Jiangjie >>> > > > Qin >>> > > > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid> >>> > > > > > > > > >>> > >> >> >> >> >>> wrote: >>> > > > > > > > > >>> > >> >> >> >> >>> >>> > > > > > > > > >>> > >> >> >> >> >>> > Hi Jay, >>> > > > > > > > > >>> > >> >> >> >> >>> > >>> > > > > > > > > >>> > >> >> >> >> >>> > Thanks for comments. Please see >>> > inline >>> > > > > > responses. >>> > > > > > > > > >>> > >> >> >> >> >>> > >>> > > > > > > > > >>> > >> >> >> >> >>> > Jiangjie (Becket) Qin >>> > > > > > > > > >>> > >> >> >> >> >>> > >>> > > > > > > > > >>> > >> >> >> >> >>> > On 1/21/15, 1:33 PM, "Jay >>>Kreps" >>> > > > > > > > > >>><jay.kr...@gmail.com> >>> > > > > > > > > >>> > >> >>wrote: >>> > > > > > > > > >>> > >> >> >> >> >>> > >>> > > > > > > > > >>> > >> >> >> >> >>> > >Hey guys, >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >A couple questions/comments: >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >1. The callback and >>> user-controlled >>> > > > commit >>> > > > > > > > offset >>> > > > > > > > > >>> > >> >> >>functionality >>> > > > > > > > > >>> > >> >> >> >>is >>> > > > > > > > > >>> > >> >> >> >> >>> already >>> > > > > > > > > >>> > >> >> >> >> >>> > >in the new consumer which we >>>are >>> > > > working on >>> > > > > > in >>> > > > > > > > > >>> parallel. >>> > > > > > > > > >>> > >> >>If we >>> > > > > > > > > >>> > >> >> >> >> >>> accelerated >>> > > > > > > > > >>> > >> >> >> >> >>> > >that work it might help >>> concentrate >>> > > > > > efforts. I >>> > > > > > > > > >>>admit >>> > > > > > > > > >>> > >>this >>> > > > > > > > > >>> > >> >> >>might >>> > > > > > > > > >>> > >> >> >> >>take >>> > > > > > > > > >>> > >> >> >> >> >>> > >slightly longer in calendar >>>time >>> but >>> > > > could >>> > > > > > still >>> > > > > > > > > >>> > >>probably >>> > > > > > > > > >>> > >> >>get >>> > > > > > > > > >>> > >> >> >> >>done >>> > > > > > > > > >>> > >> >> >> >> >>>this >>> > > > > > > > > >>> > >> >> >> >> >>> > >quarter. Have you guys >>>considered >>> > that >>> > > > > > approach? >>> > > > > > > > > >>> > >> >> >> >> >>> > Yes, I totally agree that >>>ideally >>> we >>> > > > should >>> > > > > > put >>> > > > > > > > > >>>efforts >>> > > > > > > > > >>> > >>on >>> > > > > > > > > >>> > >> >>new >>> > > > > > > > > >>> > >> >> >> >> >>>consumer. >>> > > > > > > > > >>> > >> >> >> >> >>> > The main reason for still >>>working >>> on >>> > the >>> > > > old >>> > > > > > > > > >>>consumer >>> > > > > > > > > >>> is >>> > > > > > > > > >>> > >> >>that >>> > > > > > > > > >>> > >> >> >>we >>> > > > > > > > > >>> > >> >> >> >> >>>expect >>> > > > > > > > > >>> > >> >> >> >> >>> it >>> > > > > > > > > >>> > >> >> >> >> >>> > would still be used in >>>LinkedIn >>>for >>> > > > quite a >>> > > > > > while >>> > > > > > > > > >>> before >>> > > > > > > > > >>> > >>the >>> > > > > > > > > >>> > >> >> >>new >>> > > > > > > > > >>> > >> >> >> >> >>>consumer >>> > > > > > > > > >>> > >> >> >> >> >>> > could be fully rolled out. And >>>we >>> > > > recently >>> > > > > > > > > >>>suffering a >>> > > > > > > > > >>> > >>lot >>> > > > > > > > > >>> > >> >>from >>> > > > > > > > > >>> > >> >> >> >> >>>mirror >>> > > > > > > > > >>> > >> >> >> >> >>> > maker data loss issue. So our >>> current >>> > > > plan is >>> > > > > > > > > >>>making >>> > > > > > > > > >>> > >> >>necessary >>> > > > > > > > > >>> > >> >> >> >> >>>changes to >>> > > > > > > > > >>> > >> >> >> >> >>> > make current mirror maker >>>stable in >>> > > > > > production. >>> > > > > > > > > >>>Then we >>> > > > > > > > > >>> > >>can >>> > > > > > > > > >>> > >> >> >>test >>> > > > > > > > > >>> > >> >> >> >>and >>> > > > > > > > > >>> > >> >> >> >> >>> > rollout new consumer gradually >>> > without >>> > > > > > getting >>> > > > > > > > > >>>burnt. >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >2. I think partitioning on the >>> hash >>> > of >>> > > > the >>> > > > > > topic >>> > > > > > > > > >>> > >>partition >>> > > > > > > > > >>> > >> >>is >>> > > > > > > > > >>> > >> >> >> >>not a >>> > > > > > > > > >>> > >> >> >> >> >>>very >>> > > > > > > > > >>> > >> >> >> >> >>> > >good idea because that will >>>make >>> the >>> > > > case of >>> > > > > > > > going >>> > > > > > > > > >>> from >>> > > > > > > > > >>> > >>a >>> > > > > > > > > >>> > >> >> >>cluster >>> > > > > > > > > >>> > >> >> >> >> >>>with >>> > > > > > > > > >>> > >> >> >> >> >>> > >fewer partitions to one with >>>more >>> > > > > > partitions not >>> > > > > > > > > >>> work. I >>> > > > > > > > > >>> > >> >> >>think an >>> > > > > > > > > >>> > >> >> >> >> >>> > >intuitive >>> > > > > > > > > >>> > >> >> >> >> >>> > >way to do this would be the >>> > following: >>> > > > > > > > > >>> > >> >> >> >> >>> > >a. Default behavior: Just do >>>what >>> > the >>> > > > > > producer >>> > > > > > > > > >>>does. >>> > > > > > > > > >>> > >>I.e. >>> > > > > > > > > >>> > >> >>if >>> > > > > > > > > >>> > >> >> >>you >>> > > > > > > > > >>> > >> >> >> >> >>> specify a >>> > > > > > > > > >>> > >> >> >> >> >>> > >key use it for partitioning, >>>if >>> not >>> > just >>> > > > > > > > partition >>> > > > > > > > > >>>in >>> > > > > > > > > >>> a >>> > > > > > > > > >>> > >> >> >> >>round-robin >>> > > > > > > > > >>> > >> >> >> >> >>> > >fashion. >>> > > > > > > > > >>> > >> >> >> >> >>> > >b. Add a --preserve-partition >>> option >>> > > > that >>> > > > > > will >>> > > > > > > > > >>> > >>explicitly >>> > > > > > > > > >>> > >> >> >> >>inherent >>> > > > > > > > > >>> > >> >> >> >> >>>the >>> > > > > > > > > >>> > >> >> >> >> >>> > >partition from the source >>> > irrespective >>> > > > of >>> > > > > > > > whether >>> > > > > > > > > >>> there >>> > > > > > > > > >>> > >>is >>> > > > > > > > > >>> > >> >>a >>> > > > > > > > > >>> > >> >> >>key >>> > > > > > > > > >>> > >> >> >> >>or >>> > > > > > > > > >>> > >> >> >> >> >>> which >>> > > > > > > > > >>> > >> >> >> >> >>> > >partition that key would hash >>>to. >>> > > > > > > > > >>> > >> >> >> >> >>> > Sorry that I did not explain >>>this >>> > clear >>> > > > > > enough. >>> > > > > > > > The >>> > > > > > > > > >>> hash >>> > > > > > > > > >>> > >>of >>> > > > > > > > > >>> > >> >> >>topic >>> > > > > > > > > >>> > >> >> >> >> >>> > partition is only used when >>>decide >>> > which >>> > > > > > mirror >>> > > > > > > > > >>>maker >>> > > > > > > > > >>> > >>data >>> > > > > > > > > >>> > >> >> >>channel >>> > > > > > > > > >>> > >> >> >> >> >>>queue >>> > > > > > > > > >>> > >> >> >> >> >>> > the consumer thread should put >>> > message >>> > > > into. >>> > > > > > It >>> > > > > > > > > >>>only >>> > > > > > > > > >>> > >>tries >>> > > > > > > > > >>> > >> >>to >>> > > > > > > > > >>> > >> >> >>make >>> > > > > > > > > >>> > >> >> >> >> >>>sure >>> > > > > > > > > >>> > >> >> >> >> >>> > the messages from the same >>> partition >>> > is >>> > > > sent >>> > > > > > by >>> > > > > > > > the >>> > > > > > > > > >>> same >>> > > > > > > > > >>> > >> >> >>producer >>> > > > > > > > > >>> > >> >> >> >> >>>thread >>> > > > > > > > > >>> > >> >> >> >> >>> > to guarantee the sending order. >>> This >>> > is >>> > > > not >>> > > > > > at >>> > > > > > > > all >>> > > > > > > > > >>> > >>related >>> > > > > > > > > >>> > >> >>to >>> > > > > > > > > >>> > >> >> >> >>which >>> > > > > > > > > >>> > >> >> >> >> >>> > partition in target cluster the >>> > messages >>> > > > end >>> > > > > > up. >>> > > > > > > > > >>>That >>> > > > > > > > > >>> is >>> > > > > > > > > >>> > >> >>still >>> > > > > > > > > >>> > >> >> >> >> >>>decided by >>> > > > > > > > > >>> > >> >> >> >> >>> > producer. >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >3. You don't actually give the >>> > > > > > > > > >>> ConsumerRebalanceListener >>> > > > > > > > > >>> > >> >> >> >>interface. >>> > > > > > > > > >>> > >> >> >> >> >>>What >>> > > > > > > > > >>> > >> >> >> >> >>> > >is >>> > > > > > > > > >>> > >> >> >> >> >>> > >that going to look like? >>> > > > > > > > > >>> > >> >> >> >> >>> > Good point! I should have put >>>it in >>> > the >>> > > > > > wiki. I >>> > > > > > > > > >>>just >>> > > > > > > > > >>> > >>added >>> > > > > > > > > >>> > >> >>it. >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >4. What is MirrorMakerRecord? >>>I >>> > think >>> > > > > > ideally >>> > > > > > > > the >>> > > > > > > > > >>> > >> >> >> >> >>> > >MirrorMakerMessageHandler >>> > > > > > > > > >>> > >> >> >> >> >>> > >interface would take a >>> > ConsumerRecord as >>> > > > > > input >>> > > > > > > > and >>> > > > > > > > > >>> > >>return a >>> > > > > > > > > >>> > >> >> >> >> >>> > >ProducerRecord, >>> > > > > > > > > >>> > >> >> >> >> >>> > >right? That would allow you to >>> > > > transform the >>> > > > > > > > key, >>> > > > > > > > > >>> value, >>> > > > > > > > > >>> > >> >> >> >>partition, >>> > > > > > > > > >>> > >> >> >> >> >>>or >>> > > > > > > > > >>> > >> >> >> >> >>> > >destination topic... >>> > > > > > > > > >>> > >> >> >> >> >>> > MirrorMakerRecord is >>>introduced >>>in >>> > > > > > KAFKA-1650, >>> > > > > > > > > >>>which is >>> > > > > > > > > >>> > >> >>exactly >>> > > > > > > > > >>> > >> >> >> >>the >>> > > > > > > > > >>> > >> >> >> >> >>>same >>> > > > > > > > > >>> > >> >> >> >> >>> > as ConsumerRecord in >>>KAFKA-1760. >>> > > > > > > > > >>> > >> >> >> >> >>> > private[kafka] class >>> > MirrorMakerRecord >>> > > > (val >>> > > > > > > > > >>> sourceTopic: >>> > > > > > > > > >>> > >> >> >>String, >>> > > > > > > > > >>> > >> >> >> >> >>> > val sourcePartition: Int, >>> > > > > > > > > >>> > >> >> >> >> >>> > val sourceOffset: Long, >>> > > > > > > > > >>> > >> >> >> >> >>> > val key: Array[Byte], >>> > > > > > > > > >>> > >> >> >> >> >>> > val value: Array[Byte]) { >>> > > > > > > > > >>> > >> >> >> >> >>> > def size = value.length + {if >>> (key >>> > == >>> > > > > > null) 0 >>> > > > > > > > > >>>else >>> > > > > > > > > >>> > >> >> >>key.length} >>> > > > > > > > > >>> > >> >> >> >> >>> > } >>> > > > > > > > > >>> > >> >> >> >> >>> > >>> > > > > > > > > >>> > >> >> >> >> >>> > However, because source >>>partition >>> and >>> > > > offset >>> > > > > > is >>> > > > > > > > > >>>needed >>> > > > > > > > > >>> in >>> > > > > > > > > >>> > >> >> >>producer >>> > > > > > > > > >>> > >> >> >> >> >>>thread >>> > > > > > > > > >>> > >> >> >> >> >>> > for consumer offsets >>>bookkeeping, >>> the >>> > > > record >>> > > > > > > > > >>>returned >>> > > > > > > > > >>> by >>> > > > > > > > > >>> > >> >> >> >> >>> > MirrorMakerMessageHandler >>>needs >>>to >>> > > > contain >>> > > > > > those >>> > > > > > > > > >>> > >> >>information. >>> > > > > > > > > >>> > >> >> >> >> >>>Therefore >>> > > > > > > > > >>> > >> >> >> >> >>> > ProducerRecord does not work >>>here. >>> We >>> > > > could >>> > > > > > > > > >>>probably >>> > > > > > > > > >>> let >>> > > > > > > > > >>> > >> >> >>message >>> > > > > > > > > >>> > >> >> >> >> >>>handler >>> > > > > > > > > >>> > >> >> >> >> >>> > take ConsumerRecord for both >>>input >>> > and >>> > > > > > output. >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >5. Have you guys thought about >>> what >>> > the >>> > > > > > > > > >>>implementation >>> > > > > > > > > >>> > >>will >>> > > > > > > > > >>> > >> >> >>look >>> > > > > > > > > >>> > >> >> >> >> >>>like in >>> > > > > > > > > >>> > >> >> >> >> >>> > >terms of threading >>>architecture >>> etc >>> > with >>> > > > > > the new >>> > > > > > > > > >>> > >>consumer? >>> > > > > > > > > >>> > >> >> >>That >>> > > > > > > > > >>> > >> >> >> >>will >>> > > > > > > > > >>> > >> >> >> >> >>>be >>> > > > > > > > > >>> > >> >> >> >> >>> > >soon so even if we aren't >>>starting >>> > with >>> > > > that >>> > > > > > > > let's >>> > > > > > > > > >>> make >>> > > > > > > > > >>> > >> >>sure >>> > > > > > > > > >>> > >> >> >>we >>> > > > > > > > > >>> > >> >> >> >>can >>> > > > > > > > > >>> > >> >> >> >> >>>get >>> > > > > > > > > >>> > >> >> >> >> >>> > >rid >>> > > > > > > > > >>> > >> >> >> >> >>> > >of a lot of the current mirror >>> maker >>> > > > > > accidental >>> > > > > > > > > >>> > >>complexity >>> > > > > > > > > >>> > >> >>in >>> > > > > > > > > >>> > >> >> >> >>terms >>> > > > > > > > > >>> > >> >> >> >> >>>of >>> > > > > > > > > >>> > >> >> >> >> >>> > >threads and queues when we >>>move to >>> > that. >>> > > > > > > > > >>> > >> >> >> >> >>> > I haven¹t thought about it >>> > throughly. The >>> > > > > > quick >>> > > > > > > > > >>>idea is >>> > > > > > > > > >>> > >> >>after >>> > > > > > > > > >>> > >> >> >> >> >>>migration >>> > > > > > > > > >>> > >> >> >> >> >>> to >>> > > > > > > > > >>> > >> >> >> >> >>> > the new consumer, it is >>>probably >>> > better >>> > > > to >>> > > > > > use a >>> > > > > > > > > >>>single >>> > > > > > > > > >>> > >> >> >>consumer >>> > > > > > > > > >>> > >> >> >> >> >>>thread. >>> > > > > > > > > >>> > >> >> >> >> >>> > If multithread is needed, >>> decoupling >>> > > > > > consumption >>> > > > > > > > > >>>and >>> > > > > > > > > >>> > >> >>processing >>> > > > > > > > > >>> > >> >> >> >>might >>> > > > > > > > > >>> > >> >> >> >> >>>be >>> > > > > > > > > >>> > >> >> >> >> >>> > used. MirrorMaker definitely >>>needs >>> > to be >>> > > > > > changed >>> > > > > > > > > >>>after >>> > > > > > > > > >>> > >>new >>> > > > > > > > > >>> > >> >> >> >>consumer >>> > > > > > > > > >>> > >> >> >> >> >>>get >>> > > > > > > > > >>> > >> >> >> >> >>> > checked in. I¹ll document the >>> changes >>> > > > and can >>> > > > > > > > > >>>submit >>> > > > > > > > > >>> > >>follow >>> > > > > > > > > >>> > >> >>up >>> > > > > > > > > >>> > >> >> >> >> >>>patches >>> > > > > > > > > >>> > >> >> >> >> >>> > after the new consumer is >>> available. >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >-Jay >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 >>>PM, >>> > > > Jiangjie >>> > > > > > Qin >>> > > > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid >>> > > > > > > > > >>> > >> >> >> >> >>> > >>> > > > > > > > > >>> > >> >> >> >> >>> > >wrote: >>> > > > > > > > > >>> > >> >> >> >> >>> > > >>> > > > > > > > > >>> > >> >> >> >> >>> > >> Hi Kafka Devs, >>> > > > > > > > > >>> > >> >> >> >> >>> > >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >> We are working on Kafka >>>Mirror >>> > Maker >>> > > > > > > > > >>>enhancement. A >>> > > > > > > > > >>> > >>KIP >>> > > > > > > > > >>> > >> >>is >>> > > > > > > > > >>> > >> >> >> >>posted >>> > > > > > > > > >>> > >> >> >> >> >>>to >>> > > > > > > > > >>> > >> >> >> >> >>> > >> document and discuss on the >>> > > > followings: >>> > > > > > > > > >>> > >> >> >> >> >>> > >> 1. KAFKA-1650: No Data loss >>> mirror >>> > > > maker >>> > > > > > > > change >>> > > > > > > > > >>> > >> >> >> >> >>> > >> 2. KAFKA-1839: To allow >>> partition >>> > > > aware >>> > > > > > > > mirror. >>> > > > > > > > > >>> > >> >> >> >> >>> > >> 3. KAFKA-1840: To allow >>>message >>> > > > > > > > filtering/format >>> > > > > > > > > >>> > >> >>conversion >>> > > > > > > > > >>> > >> >> >> >> >>> > >> Feedbacks are welcome. >>>Please >>> let >>> > us >>> > > > know >>> > > > > > if >>> > > > > > > > you >>> > > > > > > > > >>> have >>> > > > > > > > > >>> > >>any >>> > > > > > > > > >>> > >> >> >> >> >>>questions or >>> > > > > > > > > >>> > >> >> >> >> >>> > >> concerns. >>> > > > > > > > > >>> > >> >> >> >> >>> > >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >> Thanks. >>> > > > > > > > > >>> > >> >> >> >> >>> > >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >> Jiangjie (Becket) Qin >>> > > > > > > > > >>> > >> >> >> >> >>> > >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >>> > > > > > > > > >>> > >> >> >> >> >>> > >>> > > > > > > > > >>> > >> >> >> >> >>> >>> > > > > > > > > >>> > >> >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>-- >>> > > > > > > > > >>> > >> >> >> >> >>Thanks, >>> > > > > > > > > >>> > >> >> >> >> >>Neha >>> > > > > > > > > >>> > >> >> >> >> > >>> > > > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >>> > > > > > > > > >>> > >> >> >> >>> > > > > > > > > >>> > >> >> >>> > > > > > > > > >>> > >> >> >>> > > > > > > > > >>> > >> >>> > > > > > > > > >>> > >> >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > > >>> > > > > > > > > >>> > >-- >>> > > > > > > > > >>> > >Thanks, >>> > > > > > > > > >>> > >Neha >>> > > > > > > > > >>> > >>> > > > > > > > > >>> > >>> > > > > > > > > >>> >>> > > > > > > > > >>> >>> > > > > > > > > >>> -- >>> > > > > > > > > >>> Thanks, >>> > > > > > > > > >>> Neha >>> > > > > > > > > >>> >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > -- >>> > > > > > > Thanks, >>> > > > > > > Neha >>> > > > > > >>> > > > > > >>> > > > >>> > > > >>> > >>> > >>> >> >> >> >>-- >>Thanks, >>Neha >