Hi Jay,

> The data channels are actually a big part of the complexity of the zero
> data loss design, though, right? Because then you need some reverse channel
> to flow the acks back to the consumer based on where you are versus just
> acking what you have read and written (as in the code snippet I put up).

I'm not sure if we are on the same page. Even if the data channel was
not there the current handling for zero data loss would remain very
similar - you would need to maintain lists of unacked source offsets.
I'm wondering if the KIP needs more detail on how it is currently
implemented; or are suggesting a different approach (in which case I
have not fully understood). I'm not sure what you mean by flowing acks
back to the consumer - the MM commits offsets after the producer ack
has been received. There is some additional complexity introduced in
reducing duplicates on a rebalance - this is actually optional (since
duplicates are currently a given). The reason that was done anyway is
that with the auto-commit turned off duplicates are almost guaranteed
on a rebalance.

> I think the point that Neha and I were trying to make was that the
> motivation to embed stuff into MM kind of is related to how complex a
> simple "consume and produce" with good throughput will be. If it is simple
> to write such a thing in a few lines, the pain of embedding a bunch of
> stuff won't be worth it, if it has to be as complex as the current mm then
> of course we will need all kinds of plug ins because no one will be able to
> write such a thing. I don't have a huge concern with a simple plug-in but I
> think if it turns into something more complex with filtering and
> aggregation or whatever we really need to stop and think a bit about the
> design.

I agree - I don't think there is a use-case for any complex plug-in.
It is pretty much what Becket has described currently for the message
handler - i.e., take an incoming record and return a list of outgoing
records (which could be empty if you filter).

So here is my take on the MM:
- Bare bones: simple consumer - producer pairs (0.7 style). This is
  ideal, but does not handle no data loss
- Above plus support no data loss. This actually adds quite a bit of
  complexity.
- Above plus the message handler. This is a trivial addition I think
  that makes the MM usable in a few other mirroring-like applications.

Joel

> On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
> 
> >
> >
> > On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede wrote:
> > > I think all of us agree that we want to design MirrorMaker for 0 data
> > loss.
> > > With the absence of the data channel, 0 data loss will be much simpler to
> > > implement.
> >
> > The data channel is irrelevant to the implementation of zero data
> > loss. The complexity in the implementation of no data loss that you
> > are seeing in mirror-maker affects all consume-then-produce patterns
> > whether or not there is a data channel.  You still need to maintain a
> > list of unacked offsets. What I meant earlier is that we can
> > brainstorm completely different approaches to supporting no data loss,
> > but the current implementation is the only solution we are aware of.
> >
> > >
> > > My arguments for adding a message handler are that:
> > > > 1. It is more efficient to do something in common for all the clients
> > in
> > > > pipeline than letting each client do the same thing for many times. And
> > > > there are concrete use cases for the message handler already.
> > > >
> > >
> > > What are the concrete use cases?
> >
> > I think Becket already described a couple of use cases earlier in the
> > thread.
> >
> > <quote>
> >
> > 1. Format conversion. We have a use case where clients of source
> > cluster
> > use an internal schema and clients of target cluster use a different
> > public schema.
> > 2. Message filtering: For the messages published to source cluster,
> > there
> > are some messages private to source cluster clients and should not
> > exposed
> > to target cluster clients. It would be difficult to publish those
> > messages
> > into different partitions because they need to be ordered.
> > I agree that we can always filter/convert messages after they are
> > copied
> > to the target cluster, but that costs network bandwidth unnecessarily,
> > especially if that is a cross colo mirror. With the handler, we can
> > co-locate the mirror maker with source cluster and save that cost.
> > Also,
> > imagine there are many downstream consumers consuming from the target
> > cluster, filtering/reformatting the messages before the messages reach
> > the
> > target cluster is much more efficient than having each of the
> > consumers do
> > this individually on their own.
> >
> > </quote>
> >
> > >
> > > Also the KIP still refers to the data channel in a few places (Motivation
> > > and "On consumer rebalance" sections). Can you update the wiki so it is
> > > easier to review the new design, especially the data loss part.
> > >
> > >
> > > On Tue, Feb 10, 2015 at 10:36 AM, Joel Koshy <jjkosh...@gmail.com>
> > wrote:
> > >
> > > > I think the message handler adds little to no complexity to the mirror
> > > > maker. Jay/Neha, the MM became scary due to the rearchitecture we did
> > > > for 0.8 due to performance issues compared with 0.7 - we should remove
> > > > the data channel if it can match the current throughput. I agree it is
> > > > worth prototyping and testing that so the MM architecture is
> > > > simplified.
> > > >
> > > > The MM became a little scarier in KAFKA-1650 in order to support no
> > > > data loss. I think the implementation for no data loss will remain
> > > > about the same even in the new model (even without the data channel) -
> > > > we can probably brainstorm more if there is a better/simpler way to do
> > > > it (maybe there is in the absence of the data channel) but at the time
> > > > it was the best we (i.e., Becket, myself, Jun and Guozhang who
> > > > participated on the review) could come up with.
> > > >
> > > > So I'm definitely +1 on whatever it takes to support no data loss. I
> > > > think most people would want that out of the box.
> > > >
> > > > As for the message handler, as Becket wrote and I agree with, it is
> > > > really a trivial addition that would benefit (perhaps not most, but at
> > > > least some). So I'm personally +1 on that as well. That said, I'm also
> > > > okay with it not being there. I think the MM is fairly stand-alone and
> > > > simple enough that it is entirely reasonable and absolutely feasible
> > > > for companies to fork/re-implement the mirror maker for their own
> > > > needs.
> > > >
> > > > So in summary, I'm +1 on the KIP.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > On Mon, Feb 09, 2015 at 09:19:57PM +0000, Jiangjie Qin wrote:
> > > > > I just updated the KIP page and incorporated Jay and Neha’s
> > suggestion.
> > > > As
> > > > > a brief summary of where we are:
> > > > >
> > > > > Consensus reached:
> > > > > Have N independent mirror maker threads each has their own consumers
> > but
> > > > > share a producer. The mirror maker threads will be responsible for
> > > > > decompression, compression and offset commit. No data channel and
> > > > separate
> > > > > offset commit thread is needed. Consumer rebalance callback will be
> > used
> > > > > to avoid duplicates on rebalance.
> > > > >
> > > > > Still under discussion:
> > > > > Whether message handler is needed.
> > > > >
> > > > > My arguments for adding a message handler are that:
> > > > > 1. It is more efficient to do something in common for all the
> > clients in
> > > > > pipeline than letting each client do the same thing for many times.
> > And
> > > > > there are concrete use cases for the message handler already.
> > > > > 2. It is not a big complicated add-on to mirror maker.
> > > > > 3. Without a message handler, for customers needs it, they have to
> > > > > re-implement all the logics of mirror maker by themselves just in
> > order
> > > > to
> > > > > add this handling in pipeline.
> > > > >
> > > > > Any thoughts?
> > > > >
> > > > > Thanks.
> > > > >
> > > > > ―Jiangjie (Becket) Qin
> > > > >
> > > > > On 2/8/15, 6:35 PM, "Jiangjie Qin" <j...@linkedin.com> wrote:
> > > > >
> > > > > >Hi Jay, thanks a lot for the comments.
> > > > > >I think this solution is better. We probably don’t need data channel
> > > > > >anymore. It can be replaced with a list of producer if we need more
> > > > sender
> > > > > >thread.
> > > > > >I’ll update the KIP page.
> > > > > >
> > > > > >The reasoning about message handler is mainly for efficiency
> > purpose.
> > > > I’m
> > > > > >thinking that if something can be done in pipeline for all the
> > clients
> > > > > >such as filtering/reformatting, it is probably better to do it in
> > the
> > > > > >pipeline than asking 100 clients do the same thing for 100 times.
> > > > > >
> > > > > >―Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > >On 2/8/15, 4:59 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
> > > > > >
> > > > > >>Yeah, I second Neha's comments. The current mm code has taken
> > something
> > > > > >>pretty simple and made it pretty scary with callbacks and
> > wait/notify
> > > > > >>stuff. Do we believe this works? I can't tell by looking at it
> > which is
> > > > > >>kind of bad for something important like this. I don't mean this as
> > > > > >>criticism, I know the history: we added in memory queues to help
> > with
> > > > > >>other
> > > > > >>performance problems without thinking about correctness, then we
> > added
> > > > > >>stuff to work around the in-memory queues not lose data, and so on.
> > > > > >>
> > > > > >>Can we instead do the opposite exercise and start with the basics
> > of
> > > > what
> > > > > >>mm should do and think about what deficiencies prevents this
> > approach
> > > > > >>from
> > > > > >>working? Then let's make sure the currently in-flight work will
> > remove
> > > > > >>these deficiencies. After all mm is kind of the prototypical kafka
> > use
> > > > > >>case
> > > > > >>so if we can't make our clients to this probably no one else can.
> > > > > >>
> > > > > >>I think mm should just be N independent threads each of which has
> > their
> > > > > >>own
> > > > > >>consumer but share a producer and each of which looks like this:
> > > > > >>
> > > > > >>while(true) {
> > > > > >>    val recs = consumer.poll(Long.MaxValue);
> > > > > >>    for (rec <- recs)
> > > > > >>        producer.send(rec, logErrorCallback)
> > > > > >>    if(System.currentTimeMillis - lastCommit > commitInterval) {
> > > > > >>        producer.flush()
> > > > > >>        consumer.commit()
> > > > > >>        lastCommit = System.currentTimeMillis
> > > > > >>    }
> > > > > >>}
> > > > > >>
> > > > > >>This will depend on setting the retry count in the producer to
> > > > something
> > > > > >>high with a largish backoff so that a failed send attempt doesn't
> > drop
> > > > > >>data.
> > > > > >>
> > > > > >>We will need to use the callback to force a flush and offset
> > commit on
> > > > > >>rebalance.
> > > > > >>
> > > > > >>This approach may have a few more TCP connections due to using
> > multiple
> > > > > >>consumers but I think it is a lot easier to reason about and the
> > total
> > > > > >>number of mm instances is always going to be small.
> > > > > >>
> > > > > >>Let's talk about where this simple approach falls short, I think
> > that
> > > > > >>will
> > > > > >>help us understand your motivations for additional elements.
> > > > > >>
> > > > > >>Another advantage of this is that it is so simple I don't think we
> > > > really
> > > > > >>even need to both making mm extensible because writing your own
> > code
> > > > that
> > > > > >>does custom processing or transformation is just ten lines and no
> > plug
> > > > in
> > > > > >>system is going to make it simpler.
> > > > > >>
> > > > > >>-Jay
> > > > > >>
> > > > > >>
> > > > > >>On Sun, Feb 8, 2015 at 2:40 PM, Neha Narkhede <n...@confluent.io>
> > > > wrote:
> > > > > >>
> > > > > >>> Few comments -
> > > > > >>>
> > > > > >>> 1. Why do we need the message handler? Do you have concrete use
> > cases
> > > > > >>>in
> > > > > >>> mind? If not, we should consider adding it in the future when/if
> > we
> > > > do
> > > > > >>>have
> > > > > >>> use cases for it. The purpose of the mirror maker is a simple
> > tool
> > > > for
> > > > > >>> setting up Kafka cluster replicas. I don't see why we need to
> > > > include a
> > > > > >>> message handler for doing stream transformations or filtering.
> > You
> > > > can
> > > > > >>> always write a simple process for doing that once the data is
> > copied
> > > > as
> > > > > >>>is
> > > > > >>> in the target cluster
> > > > > >>> 2. Why keep both designs? We should prefer the simpler design
> > unless
> > > > it
> > > > > >>>is
> > > > > >>> not feasible due to the performance issue that we previously
> > had. Did
> > > > > >>>you
> > > > > >>> get a chance to run some tests to see if that is really still a
> > > > problem
> > > > > >>>or
> > > > > >>> not? It will be easier to think about the design and also make
> > the
> > > > KIP
> > > > > >>> complete if we make a call on the design first.
> > > > > >>> 3. Can you explain the need for keeping a list of unacked
> > offsets per
> > > > > >>> partition? Consider adding a section on retries and how you plan
> > to
> > > > > >>>handle
> > > > > >>> the case when the producer runs out of all retries.
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Neha
> > > > > >>>
> > > > > >>> On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin
> > > > > >>><j...@linkedin.com.invalid>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>> > Hi Neha,
> > > > > >>> >
> > > > > >>> > Yes, I’ve updated the KIP so the entire KIP is based on new
> > > > consumer
> > > > > >>>now.
> > > > > >>> > I’ve put both designs with and without data channel in the KIP
> > as I
> > > > > >>>still
> > > > > >>> > feel we might need the data channel to provide more
> > flexibility,
> > > > > >>> > especially after message handler is introduced. I’ve put my
> > > > thinking
> > > > > >>>of
> > > > > >>> > the pros and cons of the two designs in the KIP as well. It’ll
> > be
> > > > > >>>great
> > > > > >>> if
> > > > > >>> > you can give a review and comment.
> > > > > >>> >
> > > > > >>> > Thanks.
> > > > > >>> >
> > > > > >>> > Jiangjie (Becket) Qin
> > > > > >>> >
> > > > > >>> > On 2/6/15, 7:30 PM, "Neha Narkhede" <n...@confluent.io> wrote:
> > > > > >>> >
> > > > > >>> > >Hey Becket,
> > > > > >>> > >
> > > > > >>> > >What are the next steps on this KIP. As per your comment
> > earlier
> > > > on
> > > > > >>>the
> > > > > >>> > >thread -
> > > > > >>> > >
> > > > > >>> > >I do agree it makes more sense
> > > > > >>> > >> to avoid duplicate effort and plan based on new consumer.
> > I’ll
> > > > > >>>modify
> > > > > >>> > >>the
> > > > > >>> > >> KIP.
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >Did you get a chance to think about the simplified design
> > that we
> > > > > >>> proposed
> > > > > >>> > >earlier? Do you plan to update the KIP with that proposal?
> > > > > >>> > >
> > > > > >>> > >Thanks,
> > > > > >>> > >Neha
> > > > > >>> > >
> > > > > >>> > >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin
> > > > > >>><j...@linkedin.com.invalid
> > > > > >>> >
> > > > > >>> > >wrote:
> > > > > >>> > >
> > > > > >>> > >> In mirror maker we do not do de-serialization on the
> > messages.
> > > > > >>>Mirror
> > > > > >>> > >> maker use source TopicPartition hash to chose a producer to
> > send
> > > > > >>> > >>messages
> > > > > >>> > >> from the same source partition. The partition those
> > messages end
> > > > > >>>up
> > > > > >>> with
> > > > > >>> > >> are decided by Partitioner class in KafkaProducer (assuming
> > you
> > > > > >>>are
> > > > > >>> > >>using
> > > > > >>> > >> the new producer), which uses hash code of bytes[].
> > > > > >>> > >>
> > > > > >>> > >> If deserialization is needed, it has to be done in message
> > > > > >>>handler.
> > > > > >>> > >>
> > > > > >>> > >> Thanks.
> > > > > >>> > >>
> > > > > >>> > >> Jiangjie (Becket) Qin
> > > > > >>> > >>
> > > > > >>> > >> On 2/4/15, 11:33 AM, "Bhavesh Mistry" <
> > > > mistry.p.bhav...@gmail.com>
> > > > > >>> > >>wrote:
> > > > > >>> > >>
> > > > > >>> > >> >Hi Jiangjie,
> > > > > >>> > >> >
> > > > > >>> > >> >Thanks for entertaining my question so far.  Last
> > question, I
> > > > > >>>have is
> > > > > >>> > >> >about
> > > > > >>> > >> >serialization of message key.  If the key de-serialization
> > > > > >>>(Class) is
> > > > > >>> > >>not
> > > > > >>> > >> >present at the MM instance, then does it use raw byte
> > hashcode
> > > > to
> > > > > >>> > >> >determine
> > > > > >>> > >> >the partition ?  How are you going to address the situation
> > > > where
> > > > > >>>key
> > > > > >>> > >> >needs
> > > > > >>> > >> >to be de-serialization and get actual hashcode needs to be
> > > > > >>>computed
> > > > > >>> ?.
> > > > > >>> > >> >
> > > > > >>> > >> >
> > > > > >>> > >> >Thanks,
> > > > > >>> > >> >
> > > > > >>> > >> >Bhavesh
> > > > > >>> > >> >
> > > > > >>> > >> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin
> > > > > >>> > >><j...@linkedin.com.invalid>
> > > > > >>> > >> >wrote:
> > > > > >>> > >> >
> > > > > >>> > >> >> Hi Bhavesh,
> > > > > >>> > >> >>
> > > > > >>> > >> >> Please see inline comments.
> > > > > >>> > >> >>
> > > > > >>> > >> >> Jiangjie (Becket) Qin
> > > > > >>> > >> >>
> > > > > >>> > >> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry"
> > > > > >>><mistry.p.bhav...@gmail.com>
> > > > > >>> > >> >>wrote:
> > > > > >>> > >> >>
> > > > > >>> > >> >> >Hi Jiangjie,
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >Thanks for the input.
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >a) Is MM will  producer ack will be attach to Producer
> > > > > >>>Instance or
> > > > > >>> > >>per
> > > > > >>> > >> >> >topic.  Use case is that one instance of MM
> > > > > >>> > >> >> >needs to handle both strong ack and also ack=0 for some
> > > > topic.
> > > > > >>> Or
> > > > > >>> > >>it
> > > > > >>> > >> >> >would
> > > > > >>> > >> >> >be better to set-up another instance of MM.
> > > > > >>> > >> >> The acks setting is producer level setting instead of
> > topic
> > > > > >>>level
> > > > > >>> > >> >>setting.
> > > > > >>> > >> >> In this case you probably need to set up another
> > instance.
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >b) Regarding TCP connections, Why does #producer
> > instance
> > > > > >>>attach
> > > > > >>> to
> > > > > >>> > >>TCP
> > > > > >>> > >> >> >connection.  Is it possible to use Broker Connection TCP
> > > > Pool,
> > > > > >>> > >>producer
> > > > > >>> > >> >> >will just checkout TCP connection  to Broker.  So, # of
> > > > > >>>Producer
> > > > > >>> > >> >>Instance
> > > > > >>> > >> >> >does not correlation to Brokers Connection.  Is this
> > > > possible
> > > > > >>>?
> > > > > >>> > >> >> In new producer, each producer maintains a connection to
> > each
> > > > > >>> broker
> > > > > >>> > >> >> within the producer instance. Making producer instances
> > to
> > > > > >>>share
> > > > > >>> the
> > > > > >>> > >>TCP
> > > > > >>> > >> >> connections is a very big change to the current design,
> > so I
> > > > > >>> suppose
> > > > > >>> > >>we
> > > > > >>> > >> >> won’t be able to do that.
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >Thanks,
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >Bhavesh
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin
> > > > > >>> > >> >><j...@linkedin.com.invalid
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >wrote:
> > > > > >>> > >> >> >
> > > > > >>> > >> >> >> Hi Bhavesh,
> > > > > >>> > >> >> >>
> > > > > >>> > >> >> >> I think it is the right discussion to have when we are
> > > > > >>>talking
> > > > > >>> > >>about
> > > > > >>> > >> >>the
> > > > > >>> > >> >> >> new new design for MM.
> > > > > >>> > >> >> >> Please see the inline comments.
> > > > > >>> > >> >> >>
> > > > > >>> > >> >> >> Jiangjie (Becket) Qin
> > > > > >>> > >> >> >>
> > > > > >>> > >> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry"
> > > > > >>> > >><mistry.p.bhav...@gmail.com>
> > > > > >>> > >> >> >>wrote:
> > > > > >>> > >> >> >>
> > > > > >>> > >> >> >> >Hi Jiangjie,
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >I just wanted to let you know about our use case and
> > > > stress
> > > > > >>>the
> > > > > >>> > >> >>point
> > > > > >>> > >> >> >>that
> > > > > >>> > >> >> >> >local data center broker cluster have fewer
> > partitions
> > > > than
> > > > > >>>the
> > > > > >>> > >> >> >> >destination
> > > > > >>> > >> >> >> >offline broker cluster. Just because we do the batch
> > pull
> > > > > >>>from
> > > > > >>> > >>CAMUS
> > > > > >>> > >> >> >>and
> > > > > >>> > >> >> >> >in
> > > > > >>> > >> >> >> >order to drain data faster than the injection rate
> > (from
> > > > > >>>four
> > > > > >>> DCs
> > > > > >>> > >> >>for
> > > > > >>> > >> >> >>same
> > > > > >>> > >> >> >> >topic).
> > > > > >>> > >> >> >> Keeping the same partition number in source and target
> > > > > >>>cluster
> > > > > >>> > >>will
> > > > > >>> > >> >>be
> > > > > >>> > >> >> >>an
> > > > > >>> > >> >> >> option but will not be enforced by default.
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >We are facing following issues (probably due to
> > > > > >>>configuration):
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >1)      We occasionally loose data due to message
> > batch
> > > > > >>>size is
> > > > > >>> > >>too
> > > > > >>> > >> >> >>large
> > > > > >>> > >> >> >> >(2MB) on target data (we are using old producer but I
> > > > think
> > > > > >>>new
> > > > > >>> > >> >> >>producer
> > > > > >>> > >> >> >> >will solve this problem to some extend).
> > > > > >>> > >> >> >> We do see this issue in LinkedIn as well. New producer
> > > > also
> > > > > >>> might
> > > > > >>> > >> >>have
> > > > > >>> > >> >> >> this issue. There are some proposal of solutions, but
> > no
> > > > > >>>real
> > > > > >>> work
> > > > > >>> > >> >> >>started
> > > > > >>> > >> >> >> yet. For now, as a workaround, setting a more
> > aggressive
> > > > > >>>batch
> > > > > >>> > >>size
> > > > > >>> > >> >>on
> > > > > >>> > >> >> >> producer side should work.
> > > > > >>> > >> >> >> >2)      Since only one instance is set to MM data,
> > we
> > > > are
> > > > > >>>not
> > > > > >>> > >>able
> > > > > >>> > >> >>to
> > > > > >>> > >> >> >> >set-up ack per topic instead ack is attached to
> > producer
> > > > > >>> > >>instance.
> > > > > >>> > >> >> >> I don’t quite get the question here.
> > > > > >>> > >> >> >> >3)      How are you going to address two phase commit
> > > > > >>>problem
> > > > > >>> if
> > > > > >>> > >> >>ack is
> > > > > >>> > >> >> >> >set
> > > > > >>> > >> >> >> >to strongest, but auto commit is on for consumer
> > (meaning
> > > > > >>> > >>producer
> > > > > >>> > >> >>does
> > > > > >>> > >> >> >> >not
> > > > > >>> > >> >> >> >get ack,  but consumer auto committed offset that
> > > > message).
> > > > > >>> Is
> > > > > >>> > >> >>there
> > > > > >>> > >> >> >> >transactional (Kafka transaction is in process)
> > based ack
> > > > > >>>and
> > > > > >>> > >>commit
> > > > > >>> > >> >> >> >offset
> > > > > >>> > >> >> >> >?
> > > > > >>> > >> >> >> Auto offset commit should be turned off in this case.
> > The
> > > > > >>>offset
> > > > > >>> > >>will
> > > > > >>> > >> >> >>only
> > > > > >>> > >> >> >> be committed once by the offset commit thread. So
> > there is
> > > > > >>>no
> > > > > >>> two
> > > > > >>> > >> >>phase
> > > > > >>> > >> >> >> commit.
> > > > > >>> > >> >> >> >4)      How are you planning to avoid duplicated
> > message?
> > > > > >>>( Is
> > > > > >>> > >> >> >> >brokergoing
> > > > > >>> > >> >> >> >have moving window of message collected and de-dupe
> > ?)
> > > > > >>> > >>Possibly, we
> > > > > >>> > >> >> >>get
> > > > > >>> > >> >> >> >this from retry set to 5…?
> > > > > >>> > >> >> >> We are not trying to completely avoid duplicates. The
> > > > > >>>duplicates
> > > > > >>> > >>will
> > > > > >>> > >> >> >> still be there if:
> > > > > >>> > >> >> >> 1. Producer retries on failure.
> > > > > >>> > >> >> >> 2. Mirror maker is hard killed.
> > > > > >>> > >> >> >> Currently, dedup is expected to be done by user if
> > > > > >>>necessary.
> > > > > >>> > >> >> >> >5)      Last, is there any warning or any thing you
> > can
> > > > > >>>provide
> > > > > >>> > >> >>insight
> > > > > >>> > >> >> >> >from MM component about data injection rate into
> > > > > >>>destination
> > > > > >>> > >> >> >>partitions is
> > > > > >>> > >> >> >> >NOT evenly distributed regardless  of  keyed or
> > non-keyed
> > > > > >>> message
> > > > > >>> > >> >> >>(Hence
> > > > > >>> > >> >> >> >there is ripple effect such as data not arriving
> > late, or
> > > > > >>>data
> > > > > >>> is
> > > > > >>> > >> >> >>arriving
> > > > > >>> > >> >> >> >out of order in  intern of time stamp  and early some
> > > > time,
> > > > > >>>and
> > > > > >>> > >> >>CAMUS
> > > > > >>> > >> >> >> >creates huge number of file count on HDFS due to
> > uneven
> > > > > >>> injection
> > > > > >>> > >> >>rate
> > > > > >>> > >> >> >>.
> > > > > >>> > >> >> >> >Camus Job is  configured to run every 3 minutes.)
> > > > > >>> > >> >> >> I think uneven data distribution is typically caused
> > by
> > > > > >>>server
> > > > > >>> > >>side
> > > > > >>> > >> >> >> unbalance, instead of something mirror maker could
> > > > control.
> > > > > >>>In
> > > > > >>> new
> > > > > >>> > >> >> >>mirror
> > > > > >>> > >> >> >> maker, however, there is a customizable message
> > handler,
> > > > > >>>that
> > > > > >>> > >>might
> > > > > >>> > >> >>be
> > > > > >>> > >> >> >> able to help a little bit. In message handler, you can
> > > > > >>> explicitly
> > > > > >>> > >> >>set a
> > > > > >>> > >> >> >> partition that you want to produce the message to. So
> > if
> > > > you
> > > > > >>> know
> > > > > >>> > >>the
> > > > > >>> > >> >> >> uneven data distribution in target cluster, you may
> > offset
> > > > > >>>it
> > > > > >>> > >>here.
> > > > > >>> > >> >>But
> > > > > >>> > >> >> >> that probably only works for non-keyed messages.
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >I am not sure if this is right discussion form to
> > bring
> > > > > >>>these
> > > > > >>> to
> > > > > >>> > >> >> >> >your/kafka
> > > > > >>> > >> >> >> >Dev team attention.  This might be off track,
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >Thanks,
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >Bhavesh
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin
> > > > > >>> > >> >> >><j...@linkedin.com.invalid
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >wrote:
> > > > > >>> > >> >> >> >
> > > > > >>> > >> >> >> >> I’ve updated the KIP page. Feedbacks are welcome.
> > > > > >>> > >> >> >> >>
> > > > > >>> > >> >> >> >> Regarding the simple mirror maker design. I thought
> > > > over
> > > > > >>>it
> > > > > >>> and
> > > > > >>> > >> >>have
> > > > > >>> > >> >> >> >>some
> > > > > >>> > >> >> >> >> worries:
> > > > > >>> > >> >> >> >> There are two things that might worth thinking:
> > > > > >>> > >> >> >> >> 1. One of the enhancement to mirror maker is
> > adding a
> > > > > >>>message
> > > > > >>> > >> >> >>handler to
> > > > > >>> > >> >> >> >> do things like reformatting. I think we might
> > > > potentially
> > > > > >>> want
> > > > > >>> > >>to
> > > > > >>> > >> >> >>have
> > > > > >>> > >> >> >> >> more threads processing the messages than the
> > number of
> > > > > >>> > >>consumers.
> > > > > >>> > >> >> >>If we
> > > > > >>> > >> >> >> >> follow the simple mirror maker solution, we lose
> > this
> > > > > >>> > >>flexibility.
> > > > > >>> > >> >> >> >> 2. This might not matter too much, but creating
> > more
> > > > > >>> consumers
> > > > > >>> > >> >>means
> > > > > >>> > >> >> >> >>more
> > > > > >>> > >> >> >> >> footprint of TCP connection / memory.
> > > > > >>> > >> >> >> >>
> > > > > >>> > >> >> >> >> Any thoughts on this?
> > > > > >>> > >> >> >> >>
> > > > > >>> > >> >> >> >> Thanks.
> > > > > >>> > >> >> >> >>
> > > > > >>> > >> >> >> >> Jiangjie (Becket) Qin
> > > > > >>> > >> >> >> >>
> > > > > >>> > >> >> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" <
> > > > j...@linkedin.com>
> > > > > >>> > wrote:
> > > > > >>> > >> >> >> >>
> > > > > >>> > >> >> >> >> >Hi Jay and Neha,
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >> >Thanks a lot for the reply and explanation. I do
> > agree
> > > > > >>>it
> > > > > >>> > >>makes
> > > > > >>> > >> >>more
> > > > > >>> > >> >> >> >>sense
> > > > > >>> > >> >> >> >> >to avoid duplicate effort and plan based on new
> > > > > >>>consumer.
> > > > > >>> I’ll
> > > > > >>> > >> >> >>modify
> > > > > >>> > >> >> >> >>the
> > > > > >>> > >> >> >> >> >KIP.
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >> >To Jay’s question on message ordering - The data
> > > > channel
> > > > > >>> > >> >>selection
> > > > > >>> > >> >> >> >>makes
> > > > > >>> > >> >> >> >> >sure that the messages from the same source
> > partition
> > > > > >>>will
> > > > > >>> > >>sent
> > > > > >>> > >> >>by
> > > > > >>> > >> >> >>the
> > > > > >>> > >> >> >> >> >same producer. So the order of the messages is
> > > > > >>>guaranteed
> > > > > >>> with
> > > > > >>> > >> >> >>proper
> > > > > >>> > >> >> >> >> >producer settings
> > > > > >>> > >> >>(MaxInFlightRequests=1,retries=Integer.MaxValue,
> > > > > >>> > >> >> >> >>etc.)
> > > > > >>> > >> >> >> >> >For keyed messages, because they come from the
> > same
> > > > > >>>source
> > > > > >>> > >> >>partition
> > > > > >>> > >> >> >> >>and
> > > > > >>> > >> >> >> >> >will end up in the same target partition, as long
> > as
> > > > > >>>they
> > > > > >>> are
> > > > > >>> > >> >>sent
> > > > > >>> > >> >> >>by
> > > > > >>> > >> >> >> >>the
> > > > > >>> > >> >> >> >> >same producer, the order is guaranteed.
> > > > > >>> > >> >> >> >> >For non-keyed messages, the messages coming from
> > the
> > > > > >>>same
> > > > > >>> > >>source
> > > > > >>> > >> >> >> >>partition
> > > > > >>> > >> >> >> >> >might go to different target partitions. The
> > order is
> > > > > >>>only
> > > > > >>> > >> >> >>guaranteed
> > > > > >>> > >> >> >> >> >within each partition.
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >> >Anyway, I’ll modify the KIP and data channel will
> > be
> > > > > >>>away.
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >> >Thanks.
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >> >Jiangjie (Becket) Qin
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" <
> > > > n...@confluent.io>
> > > > > >>> > >>wrote:
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >> >>I think there is some value in investigating if
> > we
> > > > can
> > > > > >>>go
> > > > > >>> > >>back
> > > > > >>> > >> >>to
> > > > > >>> > >> >> >>the
> > > > > >>> > >> >> >> >> >>simple mirror maker design, as Jay points out.
> > Here
> > > > you
> > > > > >>> have
> > > > > >>> > >>N
> > > > > >>> > >> >> >> >>threads,
> > > > > >>> > >> >> >> >> >>each has a consumer and a producer.
> > > > > >>> > >> >> >> >> >>
> > > > > >>> > >> >> >> >> >>The reason why we had to move away from that was
> > a
> > > > > >>> > >>combination
> > > > > >>> > >> >>of
> > > > > >>> > >> >> >>the
> > > > > >>> > >> >> >> >> >>difference in throughput between the consumer
> > and the
> > > > > >>>old
> > > > > >>> > >> >>producer
> > > > > >>> > >> >> >>and
> > > > > >>> > >> >> >> >> >>the
> > > > > >>> > >> >> >> >> >>deficiency of the consumer rebalancing that
> > limits
> > > > the
> > > > > >>> total
> > > > > >>> > >> >> >>number of
> > > > > >>> > >> >> >> >> >>mirror maker threads. So the only option
> > available
> > > > was
> > > > > >>>to
> > > > > >>> > >> >>increase
> > > > > >>> > >> >> >>the
> > > > > >>> > >> >> >> >> >>throughput of the limited # of mirror maker
> > threads
> > > > > >>>that
> > > > > >>> > >>could
> > > > > >>> > >> >>be
> > > > > >>> > >> >> >> >> >>deployed.
> > > > > >>> > >> >> >> >> >>Now that queuing design may not make sense, if
> > the
> > > > new
> > > > > >>> > >> >>producer's
> > > > > >>> > >> >> >> >> >>throughput is almost similar to the consumer AND
> > the
> > > > > >>>fact
> > > > > >>> > >>that
> > > > > >>> > >> >>the
> > > > > >>> > >> >> >>new
> > > > > >>> > >> >> >> >> >>round-robin based consumer rebalancing can allow
> > a
> > > > very
> > > > > >>> high
> > > > > >>> > >> >> >>number of
> > > > > >>> > >> >> >> >> >>mirror maker instances to exist.
> > > > > >>> > >> >> >> >> >>
> > > > > >>> > >> >> >> >> >>This is the end state that the mirror maker
> > should be
> > > > > >>>in
> > > > > >>> once
> > > > > >>> > >> >>the
> > > > > >>> > >> >> >>new
> > > > > >>> > >> >> >> >> >>consumer is complete, so it wouldn't hurt to see
> > if
> > > > we
> > > > > >>>can
> > > > > >>> > >>just
> > > > > >>> > >> >> >>move
> > > > > >>> > >> >> >> >>to
> > > > > >>> > >> >> >> >> >>that right now.
> > > > > >>> > >> >> >> >> >>
> > > > > >>> > >> >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps
> > > > > >>> > >><jay.kr...@gmail.com
> > > > > >>> > >> >
> > > > > >>> > >> >> >> >>wrote:
> > > > > >>> > >> >> >> >> >>
> > > > > >>> > >> >> >> >> >>> QQ: If we ever use a different technique for
> > the
> > > > data
> > > > > >>> > >>channel
> > > > > >>> > >> >> >> >>selection
> > > > > >>> > >> >> >> >> >>> than for the producer partitioning won't that
> > break
> > > > > >>> > >>ordering?
> > > > > >>> > >> >>How
> > > > > >>> > >> >> >> >>can
> > > > > >>> > >> >> >> >> >>>we
> > > > > >>> > >> >> >> >> >>> ensure these things stay in sync?
> > > > > >>> > >> >> >> >> >>>
> > > > > >>> > >> >> >> >> >>> With respect to the new consumer--I really do
> > want
> > > > to
> > > > > >>> > >> >>encourage
> > > > > >>> > >> >> >> >>people
> > > > > >>> > >> >> >> >> >>>to
> > > > > >>> > >> >> >> >> >>> think through how MM will work with the new
> > > > consumer.
> > > > > >>>I
> > > > > >>> > >>mean
> > > > > >>> > >> >>this
> > > > > >>> > >> >> >> >>isn't
> > > > > >>> > >> >> >> >> >>> very far off, maybe a few months if we hustle?
> > I
> > > > > >>>could
> > > > > >>> > >> >>imagine us
> > > > > >>> > >> >> >> >> >>>getting
> > > > > >>> > >> >> >> >> >>> this mm fix done maybe sooner, maybe in a
> > month?
> > > > So I
> > > > > >>> guess
> > > > > >>> > >> >>this
> > > > > >>> > >> >> >> >>buys
> > > > > >>> > >> >> >> >> >>>us an
> > > > > >>> > >> >> >> >> >>> extra month before we rip it out and throw it
> > away?
> > > > > >>>Maybe
> > > > > >>> > >>two?
> > > > > >>> > >> >> >>This
> > > > > >>> > >> >> >> >>bug
> > > > > >>> > >> >> >> >> >>>has
> > > > > >>> > >> >> >> >> >>> been there for a while, though, right? Is it
> > worth
> > > > > >>>it?
> > > > > >>> > >> >>Probably
> > > > > >>> > >> >> >>it
> > > > > >>> > >> >> >> >>is,
> > > > > >>> > >> >> >> >> >>>but
> > > > > >>> > >> >> >> >> >>> it still kind of sucks to have the duplicate
> > > > effort.
> > > > > >>> > >> >> >> >> >>>
> > > > > >>> > >> >> >> >> >>> So anyhow let's definitely think about how
> > things
> > > > > >>>will
> > > > > >>> work
> > > > > >>> > >> >>with
> > > > > >>> > >> >> >>the
> > > > > >>> > >> >> >> >> >>>new
> > > > > >>> > >> >> >> >> >>> consumer. I think we can probably just have N
> > > > > >>>threads,
> > > > > >>> each
> > > > > >>> > >> >> >>thread
> > > > > >>> > >> >> >> >>has
> > > > > >>> > >> >> >> >> >>>a
> > > > > >>> > >> >> >> >> >>> producer and consumer and is internally single
> > > > > >>>threaded.
> > > > > >>> > >>Any
> > > > > >>> > >> >> >>reason
> > > > > >>> > >> >> >> >> >>>this
> > > > > >>> > >> >> >> >> >>> wouldn't work?
> > > > > >>> > >> >> >> >> >>>
> > > > > >>> > >> >> >> >> >>> -Jay
> > > > > >>> > >> >> >> >> >>>
> > > > > >>> > >> >> >> >> >>>
> > > > > >>> > >> >> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin
> > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid>
> > > > > >>> > >> >> >> >> >>> wrote:
> > > > > >>> > >> >> >> >> >>>
> > > > > >>> > >> >> >> >> >>> > Hi Jay,
> > > > > >>> > >> >> >> >> >>> >
> > > > > >>> > >> >> >> >> >>> > Thanks for comments. Please see inline
> > responses.
> > > > > >>> > >> >> >> >> >>> >
> > > > > >>> > >> >> >> >> >>> > Jiangjie (Becket) Qin
> > > > > >>> > >> >> >> >> >>> >
> > > > > >>> > >> >> >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps"
> > > > > >>><jay.kr...@gmail.com>
> > > > > >>> > >> >>wrote:
> > > > > >>> > >> >> >> >> >>> >
> > > > > >>> > >> >> >> >> >>> > >Hey guys,
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >A couple questions/comments:
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >1. The callback and user-controlled commit
> > > > offset
> > > > > >>> > >> >> >>functionality
> > > > > >>> > >> >> >> >>is
> > > > > >>> > >> >> >> >> >>> already
> > > > > >>> > >> >> >> >> >>> > >in the new consumer which we are working on
> > in
> > > > > >>> parallel.
> > > > > >>> > >> >>If we
> > > > > >>> > >> >> >> >> >>> accelerated
> > > > > >>> > >> >> >> >> >>> > >that work it might help concentrate
> > efforts. I
> > > > > >>>admit
> > > > > >>> > >>this
> > > > > >>> > >> >> >>might
> > > > > >>> > >> >> >> >>take
> > > > > >>> > >> >> >> >> >>> > >slightly longer in calendar time but could
> > still
> > > > > >>> > >>probably
> > > > > >>> > >> >>get
> > > > > >>> > >> >> >> >>done
> > > > > >>> > >> >> >> >> >>>this
> > > > > >>> > >> >> >> >> >>> > >quarter. Have you guys considered that
> > approach?
> > > > > >>> > >> >> >> >> >>> > Yes, I totally agree that ideally we should
> > put
> > > > > >>>efforts
> > > > > >>> > >>on
> > > > > >>> > >> >>new
> > > > > >>> > >> >> >> >> >>>consumer.
> > > > > >>> > >> >> >> >> >>> > The main reason for still working on the old
> > > > > >>>consumer
> > > > > >>> is
> > > > > >>> > >> >>that
> > > > > >>> > >> >> >>we
> > > > > >>> > >> >> >> >> >>>expect
> > > > > >>> > >> >> >> >> >>> it
> > > > > >>> > >> >> >> >> >>> > would still be used in LinkedIn for quite a
> > while
> > > > > >>> before
> > > > > >>> > >>the
> > > > > >>> > >> >> >>new
> > > > > >>> > >> >> >> >> >>>consumer
> > > > > >>> > >> >> >> >> >>> > could be fully rolled out. And we recently
> > > > > >>>suffering a
> > > > > >>> > >>lot
> > > > > >>> > >> >>from
> > > > > >>> > >> >> >> >> >>>mirror
> > > > > >>> > >> >> >> >> >>> > maker data loss issue. So our current plan is
> > > > > >>>making
> > > > > >>> > >> >>necessary
> > > > > >>> > >> >> >> >> >>>changes to
> > > > > >>> > >> >> >> >> >>> > make current mirror maker stable in
> > production.
> > > > > >>>Then we
> > > > > >>> > >>can
> > > > > >>> > >> >> >>test
> > > > > >>> > >> >> >> >>and
> > > > > >>> > >> >> >> >> >>> > rollout new consumer gradually without
> > getting
> > > > > >>>burnt.
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >2. I think partitioning on the hash of the
> > topic
> > > > > >>> > >>partition
> > > > > >>> > >> >>is
> > > > > >>> > >> >> >> >>not a
> > > > > >>> > >> >> >> >> >>>very
> > > > > >>> > >> >> >> >> >>> > >good idea because that will make the case of
> > > > going
> > > > > >>> from
> > > > > >>> > >>a
> > > > > >>> > >> >> >>cluster
> > > > > >>> > >> >> >> >> >>>with
> > > > > >>> > >> >> >> >> >>> > >fewer partitions to one with more
> > partitions not
> > > > > >>> work. I
> > > > > >>> > >> >> >>think an
> > > > > >>> > >> >> >> >> >>> > >intuitive
> > > > > >>> > >> >> >> >> >>> > >way to do this would be the following:
> > > > > >>> > >> >> >> >> >>> > >a. Default behavior: Just do what the
> > producer
> > > > > >>>does.
> > > > > >>> > >>I.e.
> > > > > >>> > >> >>if
> > > > > >>> > >> >> >>you
> > > > > >>> > >> >> >> >> >>> specify a
> > > > > >>> > >> >> >> >> >>> > >key use it for partitioning, if not just
> > > > partition
> > > > > >>>in
> > > > > >>> a
> > > > > >>> > >> >> >> >>round-robin
> > > > > >>> > >> >> >> >> >>> > >fashion.
> > > > > >>> > >> >> >> >> >>> > >b. Add a --preserve-partition option that
> > will
> > > > > >>> > >>explicitly
> > > > > >>> > >> >> >> >>inherent
> > > > > >>> > >> >> >> >> >>>the
> > > > > >>> > >> >> >> >> >>> > >partition from the source irrespective of
> > > > whether
> > > > > >>> there
> > > > > >>> > >>is
> > > > > >>> > >> >>a
> > > > > >>> > >> >> >>key
> > > > > >>> > >> >> >> >>or
> > > > > >>> > >> >> >> >> >>> which
> > > > > >>> > >> >> >> >> >>> > >partition that key would hash to.
> > > > > >>> > >> >> >> >> >>> > Sorry that I did not explain this clear
> > enough.
> > > > The
> > > > > >>> hash
> > > > > >>> > >>of
> > > > > >>> > >> >> >>topic
> > > > > >>> > >> >> >> >> >>> > partition is only used when decide which
> > mirror
> > > > > >>>maker
> > > > > >>> > >>data
> > > > > >>> > >> >> >>channel
> > > > > >>> > >> >> >> >> >>>queue
> > > > > >>> > >> >> >> >> >>> > the consumer thread should put message into.
> > It
> > > > > >>>only
> > > > > >>> > >>tries
> > > > > >>> > >> >>to
> > > > > >>> > >> >> >>make
> > > > > >>> > >> >> >> >> >>>sure
> > > > > >>> > >> >> >> >> >>> > the messages from the same partition is sent
> > by
> > > > the
> > > > > >>> same
> > > > > >>> > >> >> >>producer
> > > > > >>> > >> >> >> >> >>>thread
> > > > > >>> > >> >> >> >> >>> > to guarantee the sending order. This is not
> > at
> > > > all
> > > > > >>> > >>related
> > > > > >>> > >> >>to
> > > > > >>> > >> >> >> >>which
> > > > > >>> > >> >> >> >> >>> > partition in target cluster the messages end
> > up.
> > > > > >>>That
> > > > > >>> is
> > > > > >>> > >> >>still
> > > > > >>> > >> >> >> >> >>>decided by
> > > > > >>> > >> >> >> >> >>> > producer.
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >3. You don't actually give the
> > > > > >>> ConsumerRebalanceListener
> > > > > >>> > >> >> >> >>interface.
> > > > > >>> > >> >> >> >> >>>What
> > > > > >>> > >> >> >> >> >>> > >is
> > > > > >>> > >> >> >> >> >>> > >that going to look like?
> > > > > >>> > >> >> >> >> >>> > Good point! I should have put it in the
> > wiki. I
> > > > > >>>just
> > > > > >>> > >>added
> > > > > >>> > >> >>it.
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >4. What is MirrorMakerRecord? I think
> > ideally
> > > > the
> > > > > >>> > >> >> >> >> >>> > >MirrorMakerMessageHandler
> > > > > >>> > >> >> >> >> >>> > >interface would take a ConsumerRecord as
> > input
> > > > and
> > > > > >>> > >>return a
> > > > > >>> > >> >> >> >> >>> > >ProducerRecord,
> > > > > >>> > >> >> >> >> >>> > >right? That would allow you to transform the
> > > > key,
> > > > > >>> value,
> > > > > >>> > >> >> >> >>partition,
> > > > > >>> > >> >> >> >> >>>or
> > > > > >>> > >> >> >> >> >>> > >destination topic...
> > > > > >>> > >> >> >> >> >>> > MirrorMakerRecord is introduced in
> > KAFKA-1650,
> > > > > >>>which is
> > > > > >>> > >> >>exactly
> > > > > >>> > >> >> >> >>the
> > > > > >>> > >> >> >> >> >>>same
> > > > > >>> > >> >> >> >> >>> > as ConsumerRecord in KAFKA-1760.
> > > > > >>> > >> >> >> >> >>> > private[kafka] class MirrorMakerRecord (val
> > > > > >>> sourceTopic:
> > > > > >>> > >> >> >>String,
> > > > > >>> > >> >> >> >> >>> >   val sourcePartition: Int,
> > > > > >>> > >> >> >> >> >>> >   val sourceOffset: Long,
> > > > > >>> > >> >> >> >> >>> >   val key: Array[Byte],
> > > > > >>> > >> >> >> >> >>> >   val value: Array[Byte]) {
> > > > > >>> > >> >> >> >> >>> >   def size = value.length + {if (key ==
> > null) 0
> > > > > >>>else
> > > > > >>> > >> >> >>key.length}
> > > > > >>> > >> >> >> >> >>> > }
> > > > > >>> > >> >> >> >> >>> >
> > > > > >>> > >> >> >> >> >>> > However, because source partition and offset
> > is
> > > > > >>>needed
> > > > > >>> in
> > > > > >>> > >> >> >>producer
> > > > > >>> > >> >> >> >> >>>thread
> > > > > >>> > >> >> >> >> >>> > for consumer offsets bookkeeping, the record
> > > > > >>>returned
> > > > > >>> by
> > > > > >>> > >> >> >> >> >>> > MirrorMakerMessageHandler needs to contain
> > those
> > > > > >>> > >> >>information.
> > > > > >>> > >> >> >> >> >>>Therefore
> > > > > >>> > >> >> >> >> >>> > ProducerRecord does not work here. We could
> > > > > >>>probably
> > > > > >>> let
> > > > > >>> > >> >> >>message
> > > > > >>> > >> >> >> >> >>>handler
> > > > > >>> > >> >> >> >> >>> > take ConsumerRecord for both input and
> > output.
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >5. Have you guys thought about what the
> > > > > >>>implementation
> > > > > >>> > >>will
> > > > > >>> > >> >> >>look
> > > > > >>> > >> >> >> >> >>>like in
> > > > > >>> > >> >> >> >> >>> > >terms of threading architecture etc with
> > the new
> > > > > >>> > >>consumer?
> > > > > >>> > >> >> >>That
> > > > > >>> > >> >> >> >>will
> > > > > >>> > >> >> >> >> >>>be
> > > > > >>> > >> >> >> >> >>> > >soon so even if we aren't starting with that
> > > > let's
> > > > > >>> make
> > > > > >>> > >> >>sure
> > > > > >>> > >> >> >>we
> > > > > >>> > >> >> >> >>can
> > > > > >>> > >> >> >> >> >>>get
> > > > > >>> > >> >> >> >> >>> > >rid
> > > > > >>> > >> >> >> >> >>> > >of a lot of the current mirror maker
> > accidental
> > > > > >>> > >>complexity
> > > > > >>> > >> >>in
> > > > > >>> > >> >> >> >>terms
> > > > > >>> > >> >> >> >> >>>of
> > > > > >>> > >> >> >> >> >>> > >threads and queues when we move to that.
> > > > > >>> > >> >> >> >> >>> > I haven¹t thought about it throughly. The
> > quick
> > > > > >>>idea is
> > > > > >>> > >> >>after
> > > > > >>> > >> >> >> >> >>>migration
> > > > > >>> > >> >> >> >> >>> to
> > > > > >>> > >> >> >> >> >>> > the new consumer, it is probably better to
> > use a
> > > > > >>>single
> > > > > >>> > >> >> >>consumer
> > > > > >>> > >> >> >> >> >>>thread.
> > > > > >>> > >> >> >> >> >>> > If multithread is needed, decoupling
> > consumption
> > > > > >>>and
> > > > > >>> > >> >>processing
> > > > > >>> > >> >> >> >>might
> > > > > >>> > >> >> >> >> >>>be
> > > > > >>> > >> >> >> >> >>> > used. MirrorMaker definitely needs to be
> > changed
> > > > > >>>after
> > > > > >>> > >>new
> > > > > >>> > >> >> >> >>consumer
> > > > > >>> > >> >> >> >> >>>get
> > > > > >>> > >> >> >> >> >>> > checked in. I¹ll document the changes and can
> > > > > >>>submit
> > > > > >>> > >>follow
> > > > > >>> > >> >>up
> > > > > >>> > >> >> >> >> >>>patches
> > > > > >>> > >> >> >> >> >>> > after the new consumer is available.
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >-Jay
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie
> > Qin
> > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid
> > > > > >>> > >> >> >> >> >>> >
> > > > > >>> > >> >> >> >> >>> > >wrote:
> > > > > >>> > >> >> >> >> >>> > >
> > > > > >>> > >> >> >> >> >>> > >> Hi Kafka Devs,
> > > > > >>> > >> >> >> >> >>> > >>
> > > > > >>> > >> >> >> >> >>> > >> We are working on Kafka Mirror Maker
> > > > > >>>enhancement. A
> > > > > >>> > >>KIP
> > > > > >>> > >> >>is
> > > > > >>> > >> >> >> >>posted
> > > > > >>> > >> >> >> >> >>>to
> > > > > >>> > >> >> >> >> >>> > >> document and discuss on the followings:
> > > > > >>> > >> >> >> >> >>> > >> 1. KAFKA-1650: No Data loss mirror maker
> > > > change
> > > > > >>> > >> >> >> >> >>> > >> 2. KAFKA-1839: To allow partition aware
> > > > mirror.
> > > > > >>> > >> >> >> >> >>> > >> 3. KAFKA-1840: To allow message
> > > > filtering/format
> > > > > >>> > >> >>conversion
> > > > > >>> > >> >> >> >> >>> > >> Feedbacks are welcome. Please let us know
> > if
> > > > you
> > > > > >>> have
> > > > > >>> > >>any
> > > > > >>> > >> >> >> >> >>>questions or
> > > > > >>> > >> >> >> >> >>> > >> concerns.
> > > > > >>> > >> >> >> >> >>> > >>
> > > > > >>> > >> >> >> >> >>> > >> Thanks.
> > > > > >>> > >> >> >> >> >>> > >>
> > > > > >>> > >> >> >> >> >>> > >> Jiangjie (Becket) Qin
> > > > > >>> > >> >> >> >> >>> > >>
> > > > > >>> > >> >> >> >> >>> >
> > > > > >>> > >> >> >> >> >>> >
> > > > > >>> > >> >> >> >> >>>
> > > > > >>> > >> >> >> >> >>
> > > > > >>> > >> >> >> >> >>
> > > > > >>> > >> >> >> >> >>
> > > > > >>> > >> >> >> >> >>--
> > > > > >>> > >> >> >> >> >>Thanks,
> > > > > >>> > >> >> >> >> >>Neha
> > > > > >>> > >> >> >> >> >
> > > > > >>> > >> >> >> >>
> > > > > >>> > >> >> >> >>
> > > > > >>> > >> >> >>
> > > > > >>> > >> >> >>
> > > > > >>> > >> >>
> > > > > >>> > >> >>
> > > > > >>> > >>
> > > > > >>> > >>
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >--
> > > > > >>> > >Thanks,
> > > > > >>> > >Neha
> > > > > >>> >
> > > > > >>> >
> > > > > >>>
> > > > > >>>
> > > > > >>> --
> > > > > >>> Thanks,
> > > > > >>> Neha
> > > > > >>>
> > > > > >
> > > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> >
> >

Reply via email to