Hi Bhavesh,

I think it is the right discussion to have when we are talking about the
new new design for MM.
Please see the inline comments.

Jiangjie (Becket) Qin

On 1/28/15, 10:48 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> wrote:

>Hi Jiangjie,
>
>I just wanted to let you know about our use case and stress the point that
>local data center broker cluster have fewer partitions than the
>destination
>offline broker cluster. Just because we do the batch pull from CAMUS and
>in
>order to drain data faster than the injection rate (from four DCs for same
>topic).
Keeping the same partition number in source and target cluster will be an
option but will not be enforced by default.
>
>We are facing following issues (probably due to configuration):
>
>1)      We occasionally loose data due to message batch size is too large
>(2MB) on target data (we are using old producer but I think new producer
>will solve this problem to some extend).
We do see this issue in LinkedIn as well. New producer also might have
this issue. There are some proposal of solutions, but no real work started
yet. For now, as a workaround, setting a more aggressive batch size on
producer side should work.
>2)      Since only one instance is set to MM data,  we are not able to
>set-up ack per topic instead ack is attached to producer instance.
I don’t quite get the question here.
>3)      How are you going to address two phase commit problem if ack is
>set
>to strongest, but auto commit is on for consumer (meaning producer does
>not
>get ack,  but consumer auto committed offset that message).  Is there
>transactional (Kafka transaction is in process) based ack and commit
>offset
>?
Auto offset commit should be turned off in this case. The offset will only
be committed once by the offset commit thread. So there is no two phase
commit.
>4)      How are you planning to avoid duplicated message?  ( Is
>brokergoing
>have moving window of message collected and de-dupe ?)  Possibly, we get
>this from retry set to 5…?
We are not trying to completely avoid duplicates. The duplicates will
still be there if:
1. Producer retries on failure.
2. Mirror maker is hard killed.
Currently, dedup is expected to be done by user if necessary.
>5)      Last, is there any warning or any thing you can provide insight
>from MM component about data injection rate into destination partitions is
>NOT evenly distributed regardless  of  keyed or non-keyed message (Hence
>there is ripple effect such as data not arriving late, or data is arriving
>out of order in  intern of time stamp  and early some time, and CAMUS
>creates huge number of file count on HDFS due to uneven injection rate .
>Camus Job is  configured to run every 3 minutes.)
I think uneven data distribution is typically caused by server side
unbalance, instead of something mirror maker could control. In new mirror
maker, however, there is a customizable message handler, that might be
able to help a little bit. In message handler, you can explicitly set a
partition that you want to produce the message to. So if you know the
uneven data distribution in target cluster, you may offset it here. But
that probably only works for non-keyed messages.
>
>I am not sure if this is right discussion form to bring these to
>your/kafka
>Dev team attention.  This might be off track,
>
>
>Thanks,
>
>Bhavesh
>
>On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin <j...@linkedin.com.invalid>
>wrote:
>
>> I’ve updated the KIP page. Feedbacks are welcome.
>>
>> Regarding the simple mirror maker design. I thought over it and have
>>some
>> worries:
>> There are two things that might worth thinking:
>> 1. One of the enhancement to mirror maker is adding a message handler to
>> do things like reformatting. I think we might potentially want to have
>> more threads processing the messages than the number of consumers. If we
>> follow the simple mirror maker solution, we lose this flexibility.
>> 2. This might not matter too much, but creating more consumers means
>>more
>> footprint of TCP connection / memory.
>>
>> Any thoughts on this?
>>
>> Thanks.
>>
>> Jiangjie (Becket) Qin
>>
>> On 1/26/15, 10:35 AM, "Jiangjie Qin" <j...@linkedin.com> wrote:
>>
>> >Hi Jay and Neha,
>> >
>> >Thanks a lot for the reply and explanation. I do agree it makes more
>>sense
>> >to avoid duplicate effort and plan based on new consumer. I’ll modify
>>the
>> >KIP.
>> >
>> >To Jay’s question on message ordering - The data channel selection
>>makes
>> >sure that the messages from the same source partition will sent by the
>> >same producer. So the order of the messages is guaranteed with proper
>> >producer settings (MaxInFlightRequests=1,retries=Integer.MaxValue,
>>etc.)
>> >For keyed messages, because they come from the same source partition
>>and
>> >will end up in the same target partition, as long as they are sent by
>>the
>> >same producer, the order is guaranteed.
>> >For non-keyed messages, the messages coming from the same source
>>partition
>> >might go to different target partitions. The order is only guaranteed
>> >within each partition.
>> >
>> >Anyway, I’ll modify the KIP and data channel will be away.
>> >
>> >Thanks.
>> >
>> >Jiangjie (Becket) Qin
>> >
>> >
>> >On 1/25/15, 4:34 PM, "Neha Narkhede" <n...@confluent.io> wrote:
>> >
>> >>I think there is some value in investigating if we can go back to the
>> >>simple mirror maker design, as Jay points out. Here you have N
>>threads,
>> >>each has a consumer and a producer.
>> >>
>> >>The reason why we had to move away from that was a combination of the
>> >>difference in throughput between the consumer and the old producer and
>> >>the
>> >>deficiency of the consumer rebalancing that limits the total number of
>> >>mirror maker threads. So the only option available was to increase the
>> >>throughput of the limited # of mirror maker threads that could be
>> >>deployed.
>> >>Now that queuing design may not make sense, if the new producer's
>> >>throughput is almost similar to the consumer AND the fact that the new
>> >>round-robin based consumer rebalancing can allow a very high number of
>> >>mirror maker instances to exist.
>> >>
>> >>This is the end state that the mirror maker should be in once the new
>> >>consumer is complete, so it wouldn't hurt to see if we can just move
>>to
>> >>that right now.
>> >>
>> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps <jay.kr...@gmail.com>
>>wrote:
>> >>
>> >>> QQ: If we ever use a different technique for the data channel
>>selection
>> >>> than for the producer partitioning won't that break ordering? How
>>can
>> >>>we
>> >>> ensure these things stay in sync?
>> >>>
>> >>> With respect to the new consumer--I really do want to encourage
>>people
>> >>>to
>> >>> think through how MM will work with the new consumer. I mean this
>>isn't
>> >>> very far off, maybe a few months if we hustle? I could imagine us
>> >>>getting
>> >>> this mm fix done maybe sooner, maybe in a month? So I guess this
>>buys
>> >>>us an
>> >>> extra month before we rip it out and throw it away? Maybe two? This
>>bug
>> >>>has
>> >>> been there for a while, though, right? Is it worth it? Probably it
>>is,
>> >>>but
>> >>> it still kind of sucks to have the duplicate effort.
>> >>>
>> >>> So anyhow let's definitely think about how things will work with the
>> >>>new
>> >>> consumer. I think we can probably just have N threads, each thread
>>has
>> >>>a
>> >>> producer and consumer and is internally single threaded. Any reason
>> >>>this
>> >>> wouldn't work?
>> >>>
>> >>> -Jay
>> >>>
>> >>>
>> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin
>> >>><j...@linkedin.com.invalid>
>> >>> wrote:
>> >>>
>> >>> > Hi Jay,
>> >>> >
>> >>> > Thanks for comments. Please see inline responses.
>> >>> >
>> >>> > Jiangjie (Becket) Qin
>> >>> >
>> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
>> >>> >
>> >>> > >Hey guys,
>> >>> > >
>> >>> > >A couple questions/comments:
>> >>> > >
>> >>> > >1. The callback and user-controlled commit offset functionality
>>is
>> >>> already
>> >>> > >in the new consumer which we are working on in parallel. If we
>> >>> accelerated
>> >>> > >that work it might help concentrate efforts. I admit this might
>>take
>> >>> > >slightly longer in calendar time but could still probably get
>>done
>> >>>this
>> >>> > >quarter. Have you guys considered that approach?
>> >>> > Yes, I totally agree that ideally we should put efforts on new
>> >>>consumer.
>> >>> > The main reason for still working on the old consumer is that we
>> >>>expect
>> >>> it
>> >>> > would still be used in LinkedIn for quite a while before the new
>> >>>consumer
>> >>> > could be fully rolled out. And we recently suffering a lot from
>> >>>mirror
>> >>> > maker data loss issue. So our current plan is making necessary
>> >>>changes to
>> >>> > make current mirror maker stable in production. Then we can test
>>and
>> >>> > rollout new consumer gradually without getting burnt.
>> >>> > >
>> >>> > >2. I think partitioning on the hash of the topic partition is
>>not a
>> >>>very
>> >>> > >good idea because that will make the case of going from a cluster
>> >>>with
>> >>> > >fewer partitions to one with more partitions not work. I think an
>> >>> > >intuitive
>> >>> > >way to do this would be the following:
>> >>> > >a. Default behavior: Just do what the producer does. I.e. if you
>> >>> specify a
>> >>> > >key use it for partitioning, if not just partition in a
>>round-robin
>> >>> > >fashion.
>> >>> > >b. Add a --preserve-partition option that will explicitly
>>inherent
>> >>>the
>> >>> > >partition from the source irrespective of whether there is a key
>>or
>> >>> which
>> >>> > >partition that key would hash to.
>> >>> > Sorry that I did not explain this clear enough. The hash of topic
>> >>> > partition is only used when decide which mirror maker data channel
>> >>>queue
>> >>> > the consumer thread should put message into. It only tries to make
>> >>>sure
>> >>> > the messages from the same partition is sent by the same producer
>> >>>thread
>> >>> > to guarantee the sending order. This is not at all related to
>>which
>> >>> > partition in target cluster the messages end up. That is still
>> >>>decided by
>> >>> > producer.
>> >>> > >
>> >>> > >3. You don't actually give the ConsumerRebalanceListener
>>interface.
>> >>>What
>> >>> > >is
>> >>> > >that going to look like?
>> >>> > Good point! I should have put it in the wiki. I just added it.
>> >>> > >
>> >>> > >4. What is MirrorMakerRecord? I think ideally the
>> >>> > >MirrorMakerMessageHandler
>> >>> > >interface would take a ConsumerRecord as input and return a
>> >>> > >ProducerRecord,
>> >>> > >right? That would allow you to transform the key, value,
>>partition,
>> >>>or
>> >>> > >destination topic...
>> >>> > MirrorMakerRecord is introduced in KAFKA-1650, which is exactly
>>the
>> >>>same
>> >>> > as ConsumerRecord in KAFKA-1760.
>> >>> > private[kafka] class MirrorMakerRecord (val sourceTopic: String,
>> >>> >   val sourcePartition: Int,
>> >>> >   val sourceOffset: Long,
>> >>> >   val key: Array[Byte],
>> >>> >   val value: Array[Byte]) {
>> >>> >   def size = value.length + {if (key == null) 0 else key.length}
>> >>> > }
>> >>> >
>> >>> > However, because source partition and offset is needed in producer
>> >>>thread
>> >>> > for consumer offsets bookkeeping, the record returned by
>> >>> > MirrorMakerMessageHandler needs to contain those information.
>> >>>Therefore
>> >>> > ProducerRecord does not work here. We could probably let message
>> >>>handler
>> >>> > take ConsumerRecord for both input and output.
>> >>> > >
>> >>> > >5. Have you guys thought about what the implementation will look
>> >>>like in
>> >>> > >terms of threading architecture etc with the new consumer? That
>>will
>> >>>be
>> >>> > >soon so even if we aren't starting with that let's make sure we
>>can
>> >>>get
>> >>> > >rid
>> >>> > >of a lot of the current mirror maker accidental complexity in
>>terms
>> >>>of
>> >>> > >threads and queues when we move to that.
>> >>> > I haven¹t thought about it throughly. The quick idea is after
>> >>>migration
>> >>> to
>> >>> > the new consumer, it is probably better to use a single consumer
>> >>>thread.
>> >>> > If multithread is needed, decoupling consumption and processing
>>might
>> >>>be
>> >>> > used. MirrorMaker definitely needs to be changed after new
>>consumer
>> >>>get
>> >>> > checked in. I¹ll document the changes and can submit follow up
>> >>>patches
>> >>> > after the new consumer is available.
>> >>> > >
>> >>> > >-Jay
>> >>> > >
>> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin
>> >>><j...@linkedin.com.invalid
>> >>> >
>> >>> > >wrote:
>> >>> > >
>> >>> > >> Hi Kafka Devs,
>> >>> > >>
>> >>> > >> We are working on Kafka Mirror Maker enhancement. A KIP is
>>posted
>> >>>to
>> >>> > >> document and discuss on the followings:
>> >>> > >> 1. KAFKA-1650: No Data loss mirror maker change
>> >>> > >> 2. KAFKA-1839: To allow partition aware mirror.
>> >>> > >> 3. KAFKA-1840: To allow message filtering/format conversion
>> >>> > >> Feedbacks are welcome. Please let us know if you have any
>> >>>questions or
>> >>> > >> concerns.
>> >>> > >>
>> >>> > >> Thanks.
>> >>> > >>
>> >>> > >> Jiangjie (Becket) Qin
>> >>> > >>
>> >>> >
>> >>> >
>> >>>
>> >>
>> >>
>> >>
>> >>--
>> >>Thanks,
>> >>Neha
>> >
>>
>>

Reply via email to