Hey Becket,

What are the next steps on this KIP. As per your comment earlier on the
thread -

I do agree it makes more sense
> to avoid duplicate effort and plan based on new consumer. I’ll modify the
> KIP.


Did you get a chance to think about the simplified design that we proposed
earlier? Do you plan to update the KIP with that proposal?

Thanks,
Neha

On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> In mirror maker we do not do de-serialization on the messages. Mirror
> maker use source TopicPartition hash to chose a producer to send messages
> from the same source partition. The partition those messages end up with
> are decided by Partitioner class in KafkaProducer (assuming you are using
> the new producer), which uses hash code of bytes[].
>
> If deserialization is needed, it has to be done in message handler.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 2/4/15, 11:33 AM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> wrote:
>
> >Hi Jiangjie,
> >
> >Thanks for entertaining my question so far.  Last question, I have is
> >about
> >serialization of message key.  If the key de-serialization (Class) is not
> >present at the MM instance, then does it use raw byte hashcode to
> >determine
> >the partition ?  How are you going to address the situation where key
> >needs
> >to be de-serialization and get actual hashcode needs to be computed  ?.
> >
> >
> >Thanks,
> >
> >Bhavesh
> >
> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> >wrote:
> >
> >> Hi Bhavesh,
> >>
> >> Please see inline comments.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com>
> >>wrote:
> >>
> >> >Hi Jiangjie,
> >> >
> >> >Thanks for the input.
> >> >
> >> >a) Is MM will  producer ack will be attach to Producer Instance or per
> >> >topic.  Use case is that one instance of MM
> >> >needs to handle both strong ack and also ack=0 for some topic.  Or it
> >> >would
> >> >be better to set-up another instance of MM.
> >> The acks setting is producer level setting instead of topic level
> >>setting.
> >> In this case you probably need to set up another instance.
> >> >
> >> >b) Regarding TCP connections, Why does #producer instance attach to TCP
> >> >connection.  Is it possible to use Broker Connection TCP Pool, producer
> >> >will just checkout TCP connection  to Broker.  So, # of Producer
> >>Instance
> >> >does not correlation to Brokers Connection.  Is this possible ?
> >> In new producer, each producer maintains a connection to each broker
> >> within the producer instance. Making producer instances to share the TCP
> >> connections is a very big change to the current design, so I suppose we
> >> won’t be able to do that.
> >> >
> >> >
> >> >Thanks,
> >> >
> >> >Bhavesh
> >> >
> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin
> >><j...@linkedin.com.invalid
> >> >
> >> >wrote:
> >> >
> >> >> Hi Bhavesh,
> >> >>
> >> >> I think it is the right discussion to have when we are talking about
> >>the
> >> >> new new design for MM.
> >> >> Please see the inline comments.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com>
> >> >>wrote:
> >> >>
> >> >> >Hi Jiangjie,
> >> >> >
> >> >> >I just wanted to let you know about our use case and stress the
> >>point
> >> >>that
> >> >> >local data center broker cluster have fewer partitions than the
> >> >> >destination
> >> >> >offline broker cluster. Just because we do the batch pull from CAMUS
> >> >>and
> >> >> >in
> >> >> >order to drain data faster than the injection rate (from four DCs
> >>for
> >> >>same
> >> >> >topic).
> >> >> Keeping the same partition number in source and target cluster will
> >>be
> >> >>an
> >> >> option but will not be enforced by default.
> >> >> >
> >> >> >We are facing following issues (probably due to configuration):
> >> >> >
> >> >> >1)      We occasionally loose data due to message batch size is too
> >> >>large
> >> >> >(2MB) on target data (we are using old producer but I think new
> >> >>producer
> >> >> >will solve this problem to some extend).
> >> >> We do see this issue in LinkedIn as well. New producer also might
> >>have
> >> >> this issue. There are some proposal of solutions, but no real work
> >> >>started
> >> >> yet. For now, as a workaround, setting a more aggressive batch size
> >>on
> >> >> producer side should work.
> >> >> >2)      Since only one instance is set to MM data,  we are not able
> >>to
> >> >> >set-up ack per topic instead ack is attached to producer instance.
> >> >> I don’t quite get the question here.
> >> >> >3)      How are you going to address two phase commit problem if
> >>ack is
> >> >> >set
> >> >> >to strongest, but auto commit is on for consumer (meaning producer
> >>does
> >> >> >not
> >> >> >get ack,  but consumer auto committed offset that message).  Is
> >>there
> >> >> >transactional (Kafka transaction is in process) based ack and commit
> >> >> >offset
> >> >> >?
> >> >> Auto offset commit should be turned off in this case. The offset will
> >> >>only
> >> >> be committed once by the offset commit thread. So there is no two
> >>phase
> >> >> commit.
> >> >> >4)      How are you planning to avoid duplicated message?  ( Is
> >> >> >brokergoing
> >> >> >have moving window of message collected and de-dupe ?)  Possibly, we
> >> >>get
> >> >> >this from retry set to 5…?
> >> >> We are not trying to completely avoid duplicates. The duplicates will
> >> >> still be there if:
> >> >> 1. Producer retries on failure.
> >> >> 2. Mirror maker is hard killed.
> >> >> Currently, dedup is expected to be done by user if necessary.
> >> >> >5)      Last, is there any warning or any thing you can provide
> >>insight
> >> >> >from MM component about data injection rate into destination
> >> >>partitions is
> >> >> >NOT evenly distributed regardless  of  keyed or non-keyed message
> >> >>(Hence
> >> >> >there is ripple effect such as data not arriving late, or data is
> >> >>arriving
> >> >> >out of order in  intern of time stamp  and early some time, and
> >>CAMUS
> >> >> >creates huge number of file count on HDFS due to uneven injection
> >>rate
> >> >>.
> >> >> >Camus Job is  configured to run every 3 minutes.)
> >> >> I think uneven data distribution is typically caused by server side
> >> >> unbalance, instead of something mirror maker could control. In new
> >> >>mirror
> >> >> maker, however, there is a customizable message handler, that might
> >>be
> >> >> able to help a little bit. In message handler, you can explicitly
> >>set a
> >> >> partition that you want to produce the message to. So if you know the
> >> >> uneven data distribution in target cluster, you may offset it here.
> >>But
> >> >> that probably only works for non-keyed messages.
> >> >> >
> >> >> >I am not sure if this is right discussion form to bring these to
> >> >> >your/kafka
> >> >> >Dev team attention.  This might be off track,
> >> >> >
> >> >> >
> >> >> >Thanks,
> >> >> >
> >> >> >Bhavesh
> >> >> >
> >> >> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin
> >> >><j...@linkedin.com.invalid
> >> >> >
> >> >> >wrote:
> >> >> >
> >> >> >> I’ve updated the KIP page. Feedbacks are welcome.
> >> >> >>
> >> >> >> Regarding the simple mirror maker design. I thought over it and
> >>have
> >> >> >>some
> >> >> >> worries:
> >> >> >> There are two things that might worth thinking:
> >> >> >> 1. One of the enhancement to mirror maker is adding a message
> >> >>handler to
> >> >> >> do things like reformatting. I think we might potentially want to
> >> >>have
> >> >> >> more threads processing the messages than the number of consumers.
> >> >>If we
> >> >> >> follow the simple mirror maker solution, we lose this flexibility.
> >> >> >> 2. This might not matter too much, but creating more consumers
> >>means
> >> >> >>more
> >> >> >> footprint of TCP connection / memory.
> >> >> >>
> >> >> >> Any thoughts on this?
> >> >> >>
> >> >> >> Thanks.
> >> >> >>
> >> >> >> Jiangjie (Becket) Qin
> >> >> >>
> >> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" <j...@linkedin.com> wrote:
> >> >> >>
> >> >> >> >Hi Jay and Neha,
> >> >> >> >
> >> >> >> >Thanks a lot for the reply and explanation. I do agree it makes
> >>more
> >> >> >>sense
> >> >> >> >to avoid duplicate effort and plan based on new consumer. I’ll
> >> >>modify
> >> >> >>the
> >> >> >> >KIP.
> >> >> >> >
> >> >> >> >To Jay’s question on message ordering - The data channel
> >>selection
> >> >> >>makes
> >> >> >> >sure that the messages from the same source partition will sent
> >>by
> >> >>the
> >> >> >> >same producer. So the order of the messages is guaranteed with
> >> >>proper
> >> >> >> >producer settings
> >>(MaxInFlightRequests=1,retries=Integer.MaxValue,
> >> >> >>etc.)
> >> >> >> >For keyed messages, because they come from the same source
> >>partition
> >> >> >>and
> >> >> >> >will end up in the same target partition, as long as they are
> >>sent
> >> >>by
> >> >> >>the
> >> >> >> >same producer, the order is guaranteed.
> >> >> >> >For non-keyed messages, the messages coming from the same source
> >> >> >>partition
> >> >> >> >might go to different target partitions. The order is only
> >> >>guaranteed
> >> >> >> >within each partition.
> >> >> >> >
> >> >> >> >Anyway, I’ll modify the KIP and data channel will be away.
> >> >> >> >
> >> >> >> >Thanks.
> >> >> >> >
> >> >> >> >Jiangjie (Becket) Qin
> >> >> >> >
> >> >> >> >
> >> >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" <n...@confluent.io> wrote:
> >> >> >> >
> >> >> >> >>I think there is some value in investigating if we can go back
> >>to
> >> >>the
> >> >> >> >>simple mirror maker design, as Jay points out. Here you have N
> >> >> >>threads,
> >> >> >> >>each has a consumer and a producer.
> >> >> >> >>
> >> >> >> >>The reason why we had to move away from that was a combination
> >>of
> >> >>the
> >> >> >> >>difference in throughput between the consumer and the old
> >>producer
> >> >>and
> >> >> >> >>the
> >> >> >> >>deficiency of the consumer rebalancing that limits the total
> >> >>number of
> >> >> >> >>mirror maker threads. So the only option available was to
> >>increase
> >> >>the
> >> >> >> >>throughput of the limited # of mirror maker threads that could
> >>be
> >> >> >> >>deployed.
> >> >> >> >>Now that queuing design may not make sense, if the new
> >>producer's
> >> >> >> >>throughput is almost similar to the consumer AND the fact that
> >>the
> >> >>new
> >> >> >> >>round-robin based consumer rebalancing can allow a very high
> >> >>number of
> >> >> >> >>mirror maker instances to exist.
> >> >> >> >>
> >> >> >> >>This is the end state that the mirror maker should be in once
> >>the
> >> >>new
> >> >> >> >>consumer is complete, so it wouldn't hurt to see if we can just
> >> >>move
> >> >> >>to
> >> >> >> >>that right now.
> >> >> >> >>
> >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps <jay.kr...@gmail.com
> >
> >> >> >>wrote:
> >> >> >> >>
> >> >> >> >>> QQ: If we ever use a different technique for the data channel
> >> >> >>selection
> >> >> >> >>> than for the producer partitioning won't that break ordering?
> >>How
> >> >> >>can
> >> >> >> >>>we
> >> >> >> >>> ensure these things stay in sync?
> >> >> >> >>>
> >> >> >> >>> With respect to the new consumer--I really do want to
> >>encourage
> >> >> >>people
> >> >> >> >>>to
> >> >> >> >>> think through how MM will work with the new consumer. I mean
> >>this
> >> >> >>isn't
> >> >> >> >>> very far off, maybe a few months if we hustle? I could
> >>imagine us
> >> >> >> >>>getting
> >> >> >> >>> this mm fix done maybe sooner, maybe in a month? So I guess
> >>this
> >> >> >>buys
> >> >> >> >>>us an
> >> >> >> >>> extra month before we rip it out and throw it away? Maybe two?
> >> >>This
> >> >> >>bug
> >> >> >> >>>has
> >> >> >> >>> been there for a while, though, right? Is it worth it?
> >>Probably
> >> >>it
> >> >> >>is,
> >> >> >> >>>but
> >> >> >> >>> it still kind of sucks to have the duplicate effort.
> >> >> >> >>>
> >> >> >> >>> So anyhow let's definitely think about how things will work
> >>with
> >> >>the
> >> >> >> >>>new
> >> >> >> >>> consumer. I think we can probably just have N threads, each
> >> >>thread
> >> >> >>has
> >> >> >> >>>a
> >> >> >> >>> producer and consumer and is internally single threaded. Any
> >> >>reason
> >> >> >> >>>this
> >> >> >> >>> wouldn't work?
> >> >> >> >>>
> >> >> >> >>> -Jay
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin
> >> >> >> >>><j...@linkedin.com.invalid>
> >> >> >> >>> wrote:
> >> >> >> >>>
> >> >> >> >>> > Hi Jay,
> >> >> >> >>> >
> >> >> >> >>> > Thanks for comments. Please see inline responses.
> >> >> >> >>> >
> >> >> >> >>> > Jiangjie (Becket) Qin
> >> >> >> >>> >
> >> >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" <jay.kr...@gmail.com>
> >>wrote:
> >> >> >> >>> >
> >> >> >> >>> > >Hey guys,
> >> >> >> >>> > >
> >> >> >> >>> > >A couple questions/comments:
> >> >> >> >>> > >
> >> >> >> >>> > >1. The callback and user-controlled commit offset
> >> >>functionality
> >> >> >>is
> >> >> >> >>> already
> >> >> >> >>> > >in the new consumer which we are working on in parallel.
> >>If we
> >> >> >> >>> accelerated
> >> >> >> >>> > >that work it might help concentrate efforts. I admit this
> >> >>might
> >> >> >>take
> >> >> >> >>> > >slightly longer in calendar time but could still probably
> >>get
> >> >> >>done
> >> >> >> >>>this
> >> >> >> >>> > >quarter. Have you guys considered that approach?
> >> >> >> >>> > Yes, I totally agree that ideally we should put efforts on
> >>new
> >> >> >> >>>consumer.
> >> >> >> >>> > The main reason for still working on the old consumer is
> >>that
> >> >>we
> >> >> >> >>>expect
> >> >> >> >>> it
> >> >> >> >>> > would still be used in LinkedIn for quite a while before the
> >> >>new
> >> >> >> >>>consumer
> >> >> >> >>> > could be fully rolled out. And we recently suffering a lot
> >>from
> >> >> >> >>>mirror
> >> >> >> >>> > maker data loss issue. So our current plan is making
> >>necessary
> >> >> >> >>>changes to
> >> >> >> >>> > make current mirror maker stable in production. Then we can
> >> >>test
> >> >> >>and
> >> >> >> >>> > rollout new consumer gradually without getting burnt.
> >> >> >> >>> > >
> >> >> >> >>> > >2. I think partitioning on the hash of the topic partition
> >>is
> >> >> >>not a
> >> >> >> >>>very
> >> >> >> >>> > >good idea because that will make the case of going from a
> >> >>cluster
> >> >> >> >>>with
> >> >> >> >>> > >fewer partitions to one with more partitions not work. I
> >> >>think an
> >> >> >> >>> > >intuitive
> >> >> >> >>> > >way to do this would be the following:
> >> >> >> >>> > >a. Default behavior: Just do what the producer does. I.e.
> >>if
> >> >>you
> >> >> >> >>> specify a
> >> >> >> >>> > >key use it for partitioning, if not just partition in a
> >> >> >>round-robin
> >> >> >> >>> > >fashion.
> >> >> >> >>> > >b. Add a --preserve-partition option that will explicitly
> >> >> >>inherent
> >> >> >> >>>the
> >> >> >> >>> > >partition from the source irrespective of whether there is
> >>a
> >> >>key
> >> >> >>or
> >> >> >> >>> which
> >> >> >> >>> > >partition that key would hash to.
> >> >> >> >>> > Sorry that I did not explain this clear enough. The hash of
> >> >>topic
> >> >> >> >>> > partition is only used when decide which mirror maker data
> >> >>channel
> >> >> >> >>>queue
> >> >> >> >>> > the consumer thread should put message into. It only tries
> >>to
> >> >>make
> >> >> >> >>>sure
> >> >> >> >>> > the messages from the same partition is sent by the same
> >> >>producer
> >> >> >> >>>thread
> >> >> >> >>> > to guarantee the sending order. This is not at all related
> >>to
> >> >> >>which
> >> >> >> >>> > partition in target cluster the messages end up. That is
> >>still
> >> >> >> >>>decided by
> >> >> >> >>> > producer.
> >> >> >> >>> > >
> >> >> >> >>> > >3. You don't actually give the ConsumerRebalanceListener
> >> >> >>interface.
> >> >> >> >>>What
> >> >> >> >>> > >is
> >> >> >> >>> > >that going to look like?
> >> >> >> >>> > Good point! I should have put it in the wiki. I just added
> >>it.
> >> >> >> >>> > >
> >> >> >> >>> > >4. What is MirrorMakerRecord? I think ideally the
> >> >> >> >>> > >MirrorMakerMessageHandler
> >> >> >> >>> > >interface would take a ConsumerRecord as input and return a
> >> >> >> >>> > >ProducerRecord,
> >> >> >> >>> > >right? That would allow you to transform the key, value,
> >> >> >>partition,
> >> >> >> >>>or
> >> >> >> >>> > >destination topic...
> >> >> >> >>> > MirrorMakerRecord is introduced in KAFKA-1650, which is
> >>exactly
> >> >> >>the
> >> >> >> >>>same
> >> >> >> >>> > as ConsumerRecord in KAFKA-1760.
> >> >> >> >>> > private[kafka] class MirrorMakerRecord (val sourceTopic:
> >> >>String,
> >> >> >> >>> >   val sourcePartition: Int,
> >> >> >> >>> >   val sourceOffset: Long,
> >> >> >> >>> >   val key: Array[Byte],
> >> >> >> >>> >   val value: Array[Byte]) {
> >> >> >> >>> >   def size = value.length + {if (key == null) 0 else
> >> >>key.length}
> >> >> >> >>> > }
> >> >> >> >>> >
> >> >> >> >>> > However, because source partition and offset is needed in
> >> >>producer
> >> >> >> >>>thread
> >> >> >> >>> > for consumer offsets bookkeeping, the record returned by
> >> >> >> >>> > MirrorMakerMessageHandler needs to contain those
> >>information.
> >> >> >> >>>Therefore
> >> >> >> >>> > ProducerRecord does not work here. We could probably let
> >> >>message
> >> >> >> >>>handler
> >> >> >> >>> > take ConsumerRecord for both input and output.
> >> >> >> >>> > >
> >> >> >> >>> > >5. Have you guys thought about what the implementation will
> >> >>look
> >> >> >> >>>like in
> >> >> >> >>> > >terms of threading architecture etc with the new consumer?
> >> >>That
> >> >> >>will
> >> >> >> >>>be
> >> >> >> >>> > >soon so even if we aren't starting with that let's make
> >>sure
> >> >>we
> >> >> >>can
> >> >> >> >>>get
> >> >> >> >>> > >rid
> >> >> >> >>> > >of a lot of the current mirror maker accidental complexity
> >>in
> >> >> >>terms
> >> >> >> >>>of
> >> >> >> >>> > >threads and queues when we move to that.
> >> >> >> >>> > I haven¹t thought about it throughly. The quick idea is
> >>after
> >> >> >> >>>migration
> >> >> >> >>> to
> >> >> >> >>> > the new consumer, it is probably better to use a single
> >> >>consumer
> >> >> >> >>>thread.
> >> >> >> >>> > If multithread is needed, decoupling consumption and
> >>processing
> >> >> >>might
> >> >> >> >>>be
> >> >> >> >>> > used. MirrorMaker definitely needs to be changed after new
> >> >> >>consumer
> >> >> >> >>>get
> >> >> >> >>> > checked in. I¹ll document the changes and can submit follow
> >>up
> >> >> >> >>>patches
> >> >> >> >>> > after the new consumer is available.
> >> >> >> >>> > >
> >> >> >> >>> > >-Jay
> >> >> >> >>> > >
> >> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin
> >> >> >> >>><j...@linkedin.com.invalid
> >> >> >> >>> >
> >> >> >> >>> > >wrote:
> >> >> >> >>> > >
> >> >> >> >>> > >> Hi Kafka Devs,
> >> >> >> >>> > >>
> >> >> >> >>> > >> We are working on Kafka Mirror Maker enhancement. A KIP
> >>is
> >> >> >>posted
> >> >> >> >>>to
> >> >> >> >>> > >> document and discuss on the followings:
> >> >> >> >>> > >> 1. KAFKA-1650: No Data loss mirror maker change
> >> >> >> >>> > >> 2. KAFKA-1839: To allow partition aware mirror.
> >> >> >> >>> > >> 3. KAFKA-1840: To allow message filtering/format
> >>conversion
> >> >> >> >>> > >> Feedbacks are welcome. Please let us know if you have any
> >> >> >> >>>questions or
> >> >> >> >>> > >> concerns.
> >> >> >> >>> > >>
> >> >> >> >>> > >> Thanks.
> >> >> >> >>> > >>
> >> >> >> >>> > >> Jiangjie (Becket) Qin
> >> >> >> >>> > >>
> >> >> >> >>> >
> >> >> >> >>> >
> >> >> >> >>>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>--
> >> >> >> >>Thanks,
> >> >> >> >>Neha
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >> >>
> >>
> >>
>
>


-- 
Thanks,
Neha

Reply via email to