Hey Becket, What are the next steps on this KIP. As per your comment earlier on the thread -
I do agree it makes more sense > to avoid duplicate effort and plan based on new consumer. I’ll modify the > KIP. Did you get a chance to think about the simplified design that we proposed earlier? Do you plan to update the KIP with that proposal? Thanks, Neha On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > In mirror maker we do not do de-serialization on the messages. Mirror > maker use source TopicPartition hash to chose a producer to send messages > from the same source partition. The partition those messages end up with > are decided by Partitioner class in KafkaProducer (assuming you are using > the new producer), which uses hash code of bytes[]. > > If deserialization is needed, it has to be done in message handler. > > Thanks. > > Jiangjie (Becket) Qin > > On 2/4/15, 11:33 AM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> wrote: > > >Hi Jiangjie, > > > >Thanks for entertaining my question so far. Last question, I have is > >about > >serialization of message key. If the key de-serialization (Class) is not > >present at the MM instance, then does it use raw byte hashcode to > >determine > >the partition ? How are you going to address the situation where key > >needs > >to be de-serialization and get actual hashcode needs to be computed ?. > > > > > >Thanks, > > > >Bhavesh > > > >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin <j...@linkedin.com.invalid> > >wrote: > > > >> Hi Bhavesh, > >> > >> Please see inline comments. > >> > >> Jiangjie (Becket) Qin > >> > >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> > >>wrote: > >> > >> >Hi Jiangjie, > >> > > >> >Thanks for the input. > >> > > >> >a) Is MM will producer ack will be attach to Producer Instance or per > >> >topic. Use case is that one instance of MM > >> >needs to handle both strong ack and also ack=0 for some topic. Or it > >> >would > >> >be better to set-up another instance of MM. > >> The acks setting is producer level setting instead of topic level > >>setting. > >> In this case you probably need to set up another instance. > >> > > >> >b) Regarding TCP connections, Why does #producer instance attach to TCP > >> >connection. Is it possible to use Broker Connection TCP Pool, producer > >> >will just checkout TCP connection to Broker. So, # of Producer > >>Instance > >> >does not correlation to Brokers Connection. Is this possible ? > >> In new producer, each producer maintains a connection to each broker > >> within the producer instance. Making producer instances to share the TCP > >> connections is a very big change to the current design, so I suppose we > >> won’t be able to do that. > >> > > >> > > >> >Thanks, > >> > > >> >Bhavesh > >> > > >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin > >><j...@linkedin.com.invalid > >> > > >> >wrote: > >> > > >> >> Hi Bhavesh, > >> >> > >> >> I think it is the right discussion to have when we are talking about > >>the > >> >> new new design for MM. > >> >> Please see the inline comments. > >> >> > >> >> Jiangjie (Becket) Qin > >> >> > >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> > >> >>wrote: > >> >> > >> >> >Hi Jiangjie, > >> >> > > >> >> >I just wanted to let you know about our use case and stress the > >>point > >> >>that > >> >> >local data center broker cluster have fewer partitions than the > >> >> >destination > >> >> >offline broker cluster. Just because we do the batch pull from CAMUS > >> >>and > >> >> >in > >> >> >order to drain data faster than the injection rate (from four DCs > >>for > >> >>same > >> >> >topic). > >> >> Keeping the same partition number in source and target cluster will > >>be > >> >>an > >> >> option but will not be enforced by default. > >> >> > > >> >> >We are facing following issues (probably due to configuration): > >> >> > > >> >> >1) We occasionally loose data due to message batch size is too > >> >>large > >> >> >(2MB) on target data (we are using old producer but I think new > >> >>producer > >> >> >will solve this problem to some extend). > >> >> We do see this issue in LinkedIn as well. New producer also might > >>have > >> >> this issue. There are some proposal of solutions, but no real work > >> >>started > >> >> yet. For now, as a workaround, setting a more aggressive batch size > >>on > >> >> producer side should work. > >> >> >2) Since only one instance is set to MM data, we are not able > >>to > >> >> >set-up ack per topic instead ack is attached to producer instance. > >> >> I don’t quite get the question here. > >> >> >3) How are you going to address two phase commit problem if > >>ack is > >> >> >set > >> >> >to strongest, but auto commit is on for consumer (meaning producer > >>does > >> >> >not > >> >> >get ack, but consumer auto committed offset that message). Is > >>there > >> >> >transactional (Kafka transaction is in process) based ack and commit > >> >> >offset > >> >> >? > >> >> Auto offset commit should be turned off in this case. The offset will > >> >>only > >> >> be committed once by the offset commit thread. So there is no two > >>phase > >> >> commit. > >> >> >4) How are you planning to avoid duplicated message? ( Is > >> >> >brokergoing > >> >> >have moving window of message collected and de-dupe ?) Possibly, we > >> >>get > >> >> >this from retry set to 5…? > >> >> We are not trying to completely avoid duplicates. The duplicates will > >> >> still be there if: > >> >> 1. Producer retries on failure. > >> >> 2. Mirror maker is hard killed. > >> >> Currently, dedup is expected to be done by user if necessary. > >> >> >5) Last, is there any warning or any thing you can provide > >>insight > >> >> >from MM component about data injection rate into destination > >> >>partitions is > >> >> >NOT evenly distributed regardless of keyed or non-keyed message > >> >>(Hence > >> >> >there is ripple effect such as data not arriving late, or data is > >> >>arriving > >> >> >out of order in intern of time stamp and early some time, and > >>CAMUS > >> >> >creates huge number of file count on HDFS due to uneven injection > >>rate > >> >>. > >> >> >Camus Job is configured to run every 3 minutes.) > >> >> I think uneven data distribution is typically caused by server side > >> >> unbalance, instead of something mirror maker could control. In new > >> >>mirror > >> >> maker, however, there is a customizable message handler, that might > >>be > >> >> able to help a little bit. In message handler, you can explicitly > >>set a > >> >> partition that you want to produce the message to. So if you know the > >> >> uneven data distribution in target cluster, you may offset it here. > >>But > >> >> that probably only works for non-keyed messages. > >> >> > > >> >> >I am not sure if this is right discussion form to bring these to > >> >> >your/kafka > >> >> >Dev team attention. This might be off track, > >> >> > > >> >> > > >> >> >Thanks, > >> >> > > >> >> >Bhavesh > >> >> > > >> >> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin > >> >><j...@linkedin.com.invalid > >> >> > > >> >> >wrote: > >> >> > > >> >> >> I’ve updated the KIP page. Feedbacks are welcome. > >> >> >> > >> >> >> Regarding the simple mirror maker design. I thought over it and > >>have > >> >> >>some > >> >> >> worries: > >> >> >> There are two things that might worth thinking: > >> >> >> 1. One of the enhancement to mirror maker is adding a message > >> >>handler to > >> >> >> do things like reformatting. I think we might potentially want to > >> >>have > >> >> >> more threads processing the messages than the number of consumers. > >> >>If we > >> >> >> follow the simple mirror maker solution, we lose this flexibility. > >> >> >> 2. This might not matter too much, but creating more consumers > >>means > >> >> >>more > >> >> >> footprint of TCP connection / memory. > >> >> >> > >> >> >> Any thoughts on this? > >> >> >> > >> >> >> Thanks. > >> >> >> > >> >> >> Jiangjie (Becket) Qin > >> >> >> > >> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" <j...@linkedin.com> wrote: > >> >> >> > >> >> >> >Hi Jay and Neha, > >> >> >> > > >> >> >> >Thanks a lot for the reply and explanation. I do agree it makes > >>more > >> >> >>sense > >> >> >> >to avoid duplicate effort and plan based on new consumer. I’ll > >> >>modify > >> >> >>the > >> >> >> >KIP. > >> >> >> > > >> >> >> >To Jay’s question on message ordering - The data channel > >>selection > >> >> >>makes > >> >> >> >sure that the messages from the same source partition will sent > >>by > >> >>the > >> >> >> >same producer. So the order of the messages is guaranteed with > >> >>proper > >> >> >> >producer settings > >>(MaxInFlightRequests=1,retries=Integer.MaxValue, > >> >> >>etc.) > >> >> >> >For keyed messages, because they come from the same source > >>partition > >> >> >>and > >> >> >> >will end up in the same target partition, as long as they are > >>sent > >> >>by > >> >> >>the > >> >> >> >same producer, the order is guaranteed. > >> >> >> >For non-keyed messages, the messages coming from the same source > >> >> >>partition > >> >> >> >might go to different target partitions. The order is only > >> >>guaranteed > >> >> >> >within each partition. > >> >> >> > > >> >> >> >Anyway, I’ll modify the KIP and data channel will be away. > >> >> >> > > >> >> >> >Thanks. > >> >> >> > > >> >> >> >Jiangjie (Becket) Qin > >> >> >> > > >> >> >> > > >> >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" <n...@confluent.io> wrote: > >> >> >> > > >> >> >> >>I think there is some value in investigating if we can go back > >>to > >> >>the > >> >> >> >>simple mirror maker design, as Jay points out. Here you have N > >> >> >>threads, > >> >> >> >>each has a consumer and a producer. > >> >> >> >> > >> >> >> >>The reason why we had to move away from that was a combination > >>of > >> >>the > >> >> >> >>difference in throughput between the consumer and the old > >>producer > >> >>and > >> >> >> >>the > >> >> >> >>deficiency of the consumer rebalancing that limits the total > >> >>number of > >> >> >> >>mirror maker threads. So the only option available was to > >>increase > >> >>the > >> >> >> >>throughput of the limited # of mirror maker threads that could > >>be > >> >> >> >>deployed. > >> >> >> >>Now that queuing design may not make sense, if the new > >>producer's > >> >> >> >>throughput is almost similar to the consumer AND the fact that > >>the > >> >>new > >> >> >> >>round-robin based consumer rebalancing can allow a very high > >> >>number of > >> >> >> >>mirror maker instances to exist. > >> >> >> >> > >> >> >> >>This is the end state that the mirror maker should be in once > >>the > >> >>new > >> >> >> >>consumer is complete, so it wouldn't hurt to see if we can just > >> >>move > >> >> >>to > >> >> >> >>that right now. > >> >> >> >> > >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps <jay.kr...@gmail.com > > > >> >> >>wrote: > >> >> >> >> > >> >> >> >>> QQ: If we ever use a different technique for the data channel > >> >> >>selection > >> >> >> >>> than for the producer partitioning won't that break ordering? > >>How > >> >> >>can > >> >> >> >>>we > >> >> >> >>> ensure these things stay in sync? > >> >> >> >>> > >> >> >> >>> With respect to the new consumer--I really do want to > >>encourage > >> >> >>people > >> >> >> >>>to > >> >> >> >>> think through how MM will work with the new consumer. I mean > >>this > >> >> >>isn't > >> >> >> >>> very far off, maybe a few months if we hustle? I could > >>imagine us > >> >> >> >>>getting > >> >> >> >>> this mm fix done maybe sooner, maybe in a month? So I guess > >>this > >> >> >>buys > >> >> >> >>>us an > >> >> >> >>> extra month before we rip it out and throw it away? Maybe two? > >> >>This > >> >> >>bug > >> >> >> >>>has > >> >> >> >>> been there for a while, though, right? Is it worth it? > >>Probably > >> >>it > >> >> >>is, > >> >> >> >>>but > >> >> >> >>> it still kind of sucks to have the duplicate effort. > >> >> >> >>> > >> >> >> >>> So anyhow let's definitely think about how things will work > >>with > >> >>the > >> >> >> >>>new > >> >> >> >>> consumer. I think we can probably just have N threads, each > >> >>thread > >> >> >>has > >> >> >> >>>a > >> >> >> >>> producer and consumer and is internally single threaded. Any > >> >>reason > >> >> >> >>>this > >> >> >> >>> wouldn't work? > >> >> >> >>> > >> >> >> >>> -Jay > >> >> >> >>> > >> >> >> >>> > >> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin > >> >> >> >>><j...@linkedin.com.invalid> > >> >> >> >>> wrote: > >> >> >> >>> > >> >> >> >>> > Hi Jay, > >> >> >> >>> > > >> >> >> >>> > Thanks for comments. Please see inline responses. > >> >> >> >>> > > >> >> >> >>> > Jiangjie (Becket) Qin > >> >> >> >>> > > >> >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" <jay.kr...@gmail.com> > >>wrote: > >> >> >> >>> > > >> >> >> >>> > >Hey guys, > >> >> >> >>> > > > >> >> >> >>> > >A couple questions/comments: > >> >> >> >>> > > > >> >> >> >>> > >1. The callback and user-controlled commit offset > >> >>functionality > >> >> >>is > >> >> >> >>> already > >> >> >> >>> > >in the new consumer which we are working on in parallel. > >>If we > >> >> >> >>> accelerated > >> >> >> >>> > >that work it might help concentrate efforts. I admit this > >> >>might > >> >> >>take > >> >> >> >>> > >slightly longer in calendar time but could still probably > >>get > >> >> >>done > >> >> >> >>>this > >> >> >> >>> > >quarter. Have you guys considered that approach? > >> >> >> >>> > Yes, I totally agree that ideally we should put efforts on > >>new > >> >> >> >>>consumer. > >> >> >> >>> > The main reason for still working on the old consumer is > >>that > >> >>we > >> >> >> >>>expect > >> >> >> >>> it > >> >> >> >>> > would still be used in LinkedIn for quite a while before the > >> >>new > >> >> >> >>>consumer > >> >> >> >>> > could be fully rolled out. And we recently suffering a lot > >>from > >> >> >> >>>mirror > >> >> >> >>> > maker data loss issue. So our current plan is making > >>necessary > >> >> >> >>>changes to > >> >> >> >>> > make current mirror maker stable in production. Then we can > >> >>test > >> >> >>and > >> >> >> >>> > rollout new consumer gradually without getting burnt. > >> >> >> >>> > > > >> >> >> >>> > >2. I think partitioning on the hash of the topic partition > >>is > >> >> >>not a > >> >> >> >>>very > >> >> >> >>> > >good idea because that will make the case of going from a > >> >>cluster > >> >> >> >>>with > >> >> >> >>> > >fewer partitions to one with more partitions not work. I > >> >>think an > >> >> >> >>> > >intuitive > >> >> >> >>> > >way to do this would be the following: > >> >> >> >>> > >a. Default behavior: Just do what the producer does. I.e. > >>if > >> >>you > >> >> >> >>> specify a > >> >> >> >>> > >key use it for partitioning, if not just partition in a > >> >> >>round-robin > >> >> >> >>> > >fashion. > >> >> >> >>> > >b. Add a --preserve-partition option that will explicitly > >> >> >>inherent > >> >> >> >>>the > >> >> >> >>> > >partition from the source irrespective of whether there is > >>a > >> >>key > >> >> >>or > >> >> >> >>> which > >> >> >> >>> > >partition that key would hash to. > >> >> >> >>> > Sorry that I did not explain this clear enough. The hash of > >> >>topic > >> >> >> >>> > partition is only used when decide which mirror maker data > >> >>channel > >> >> >> >>>queue > >> >> >> >>> > the consumer thread should put message into. It only tries > >>to > >> >>make > >> >> >> >>>sure > >> >> >> >>> > the messages from the same partition is sent by the same > >> >>producer > >> >> >> >>>thread > >> >> >> >>> > to guarantee the sending order. This is not at all related > >>to > >> >> >>which > >> >> >> >>> > partition in target cluster the messages end up. That is > >>still > >> >> >> >>>decided by > >> >> >> >>> > producer. > >> >> >> >>> > > > >> >> >> >>> > >3. You don't actually give the ConsumerRebalanceListener > >> >> >>interface. > >> >> >> >>>What > >> >> >> >>> > >is > >> >> >> >>> > >that going to look like? > >> >> >> >>> > Good point! I should have put it in the wiki. I just added > >>it. > >> >> >> >>> > > > >> >> >> >>> > >4. What is MirrorMakerRecord? I think ideally the > >> >> >> >>> > >MirrorMakerMessageHandler > >> >> >> >>> > >interface would take a ConsumerRecord as input and return a > >> >> >> >>> > >ProducerRecord, > >> >> >> >>> > >right? That would allow you to transform the key, value, > >> >> >>partition, > >> >> >> >>>or > >> >> >> >>> > >destination topic... > >> >> >> >>> > MirrorMakerRecord is introduced in KAFKA-1650, which is > >>exactly > >> >> >>the > >> >> >> >>>same > >> >> >> >>> > as ConsumerRecord in KAFKA-1760. > >> >> >> >>> > private[kafka] class MirrorMakerRecord (val sourceTopic: > >> >>String, > >> >> >> >>> > val sourcePartition: Int, > >> >> >> >>> > val sourceOffset: Long, > >> >> >> >>> > val key: Array[Byte], > >> >> >> >>> > val value: Array[Byte]) { > >> >> >> >>> > def size = value.length + {if (key == null) 0 else > >> >>key.length} > >> >> >> >>> > } > >> >> >> >>> > > >> >> >> >>> > However, because source partition and offset is needed in > >> >>producer > >> >> >> >>>thread > >> >> >> >>> > for consumer offsets bookkeeping, the record returned by > >> >> >> >>> > MirrorMakerMessageHandler needs to contain those > >>information. > >> >> >> >>>Therefore > >> >> >> >>> > ProducerRecord does not work here. We could probably let > >> >>message > >> >> >> >>>handler > >> >> >> >>> > take ConsumerRecord for both input and output. > >> >> >> >>> > > > >> >> >> >>> > >5. Have you guys thought about what the implementation will > >> >>look > >> >> >> >>>like in > >> >> >> >>> > >terms of threading architecture etc with the new consumer? > >> >>That > >> >> >>will > >> >> >> >>>be > >> >> >> >>> > >soon so even if we aren't starting with that let's make > >>sure > >> >>we > >> >> >>can > >> >> >> >>>get > >> >> >> >>> > >rid > >> >> >> >>> > >of a lot of the current mirror maker accidental complexity > >>in > >> >> >>terms > >> >> >> >>>of > >> >> >> >>> > >threads and queues when we move to that. > >> >> >> >>> > I haven¹t thought about it throughly. The quick idea is > >>after > >> >> >> >>>migration > >> >> >> >>> to > >> >> >> >>> > the new consumer, it is probably better to use a single > >> >>consumer > >> >> >> >>>thread. > >> >> >> >>> > If multithread is needed, decoupling consumption and > >>processing > >> >> >>might > >> >> >> >>>be > >> >> >> >>> > used. MirrorMaker definitely needs to be changed after new > >> >> >>consumer > >> >> >> >>>get > >> >> >> >>> > checked in. I¹ll document the changes and can submit follow > >>up > >> >> >> >>>patches > >> >> >> >>> > after the new consumer is available. > >> >> >> >>> > > > >> >> >> >>> > >-Jay > >> >> >> >>> > > > >> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin > >> >> >> >>><j...@linkedin.com.invalid > >> >> >> >>> > > >> >> >> >>> > >wrote: > >> >> >> >>> > > > >> >> >> >>> > >> Hi Kafka Devs, > >> >> >> >>> > >> > >> >> >> >>> > >> We are working on Kafka Mirror Maker enhancement. A KIP > >>is > >> >> >>posted > >> >> >> >>>to > >> >> >> >>> > >> document and discuss on the followings: > >> >> >> >>> > >> 1. KAFKA-1650: No Data loss mirror maker change > >> >> >> >>> > >> 2. KAFKA-1839: To allow partition aware mirror. > >> >> >> >>> > >> 3. KAFKA-1840: To allow message filtering/format > >>conversion > >> >> >> >>> > >> Feedbacks are welcome. Please let us know if you have any > >> >> >> >>>questions or > >> >> >> >>> > >> concerns. > >> >> >> >>> > >> > >> >> >> >>> > >> Thanks. > >> >> >> >>> > >> > >> >> >> >>> > >> Jiangjie (Becket) Qin > >> >> >> >>> > >> > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> > >> >> >> >> > >> >> >> >> > >> >> >> >> > >> >> >> >>-- > >> >> >> >>Thanks, > >> >> >> >>Neha > >> >> >> > > >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > > -- Thanks, Neha