That is so cool. Thank you On Sun, 28 Jun 2015 at 04:29 Guozhang Wang <wangg...@gmail.com> wrote:
> Tao, I have added you to the contributor list of Kafka so you can assign > tickets to yourself now. > > I will review the patch soon. > > Guozhang > > On Thu, Jun 25, 2015 at 2:54 AM, tao xiao <xiaotao...@gmail.com> wrote: > > > Patch updated. please review > > > > On Mon, 22 Jun 2015 at 12:24 tao xiao <xiaotao...@gmail.com> wrote: > > > > > Yes, you are right. Will update the patch > > > On Mon, Jun 22, 2015 at 12:16 PM Jiangjie Qin > <j...@linkedin.com.invalid > > > > > > wrote: > > > > > >> Should we still store the value bytes when logAsString is set to TRUE > > and > > >> only store the length when logAsString is set to FALSE. > > >> > > >> On 6/21/15, 7:29 PM, "tao xiao" <xiaotao...@gmail.com> wrote: > > >> > > >> >The patch I submitted did the what you suggested. It store the size > > only > > >> >and print it out when error occurs. > > >> > > > >> >On Mon, Jun 22, 2015 at 5:26 AM Jiangjie Qin > <j...@linkedin.com.invalid > > > > > >> >wrote: > > >> > > > >> >> Yes, we can expose a user callback in MM, just like we did for > > >> rebalance > > >> >> listener. > > >> >> I still think ErrorLoggingCallback needs some change, though. Can > we > > >> >>only > > >> >> store the value bytes when logAsString is set to true? That looks > > more > > >> >> reasonable to me. > > >> >> > > >> >> Jiangjie (Becket) Qin > > >> >> > > >> >> On 6/21/15, 3:02 AM, "tao xiao" <xiaotao...@gmail.com> wrote: > > >> >> > > >> >> >Yes, I agree with that. It is even better if we can supply our own > > >> >> >callback. For people who want to view the content of message when > > >> >>failure > > >> >> >they still can do so > > >> >> > > > >> >> >On Sun, Jun 21, 2015 at 2:20 PM Guozhang Wang <wangg...@gmail.com > > > > >> >>wrote: > > >> >> > > > >> >> >> Hi Tao / Jiangjie, > > >> >> >> > > >> >> >> I think a better fix here may be not letting > > >> >> >>MirrorMakerProducerCallback to > > >> >> >> extend from ErrorLoggingCallback, but rather change the > > >> >> >> ErrorLoggingCallback itself as it defeats the usage of > > logAsString, > > >> >> >>which I > > >> >> >> think is useful for a general error logging purposes. Rather we > > can > > >> >> >> let MirrorMakerProducerCallback > > >> >> >> to not take the value bytes itself but just the length if people > > >> >>agree > > >> >> >>that > > >> >> >> for MM we probably are not interested in its message value in > > >> >>callback. > > >> >> >> Thoughts? > > >> >> >> > > >> >> >> Guozhang > > >> >> >> > > >> >> >> On Wed, Jun 17, 2015 at 1:06 AM, tao xiao <xiaotao...@gmail.com > > > > >> >>wrote: > > >> >> >> > > >> >> >> > Thank you for the reply. > > >> >> >> > > > >> >> >> > Patch submitted > > https://issues.apache.org/jira/browse/KAFKA-2281 > > >> >> >> > > > >> >> >> > On Mon, 15 Jun 2015 at 02:16 Jiangjie Qin > > >> >><j...@linkedin.com.invalid> > > >> >> >> > wrote: > > >> >> >> > > > >> >> >> > > Hi Tao, > > >> >> >> > > > > >> >> >> > > Yes, the issue that ErrorLoggingCallback keeps value as > local > > >> >> >>variable > > >> >> >> is > > >> >> >> > > known for a while and we probably should fix it as the value > > is > > >> >>not > > >> >> >> used > > >> >> >> > > except logging the its size. Can you open a ticket and maybe > > >> also > > >> >> >> submit > > >> >> >> > a > > >> >> >> > > patch? > > >> >> >> > > > > >> >> >> > > For unreachable objects I donĀ¹t think it is memory leak. As > > you > > >> >> >>said, > > >> >> >> GC > > >> >> >> > > should take care of this. In LinkedIn we are using G1GC with > > >> some > > >> >> >> tunings > > >> >> >> > > made by our SRE. You can try that if interested. > > >> >> >> > > > > >> >> >> > > Thanks, > > >> >> >> > > > > >> >> >> > > Jiangjie (Becket) Qin > > >> >> >> > > > > >> >> >> > > On 6/13/15, 11:39 AM, "tao xiao" <xiaotao...@gmail.com> > > wrote: > > >> >> >> > > > > >> >> >> > > >Hi, > > >> >> >> > > > > > >> >> >> > > >I am using mirror maker in trunk to replica data across two > > >> data > > >> >> >> > centers. > > >> >> >> > > >While the destination broker was having busy load and > > >> >>unresponsive > > >> >> >>the > > >> >> >> > > >send > > >> >> >> > > >rate of mirror maker was very low and the available > producer > > >> >>buffer > > >> >> >> was > > >> >> >> > > >quickly filled up. At the end mirror maker threw OOME. > > Detailed > > >> >> >> > exception > > >> >> >> > > >can be found here > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> > > >> >> > > >> > > > https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-excepti > > >> >> >>o > > >> >> >> > > >n-L1 > > >> >> >> > > > > > >> >> >> > > >I started up mirror maker with 1G memory and 256M producer > > >> >>buffer. > > >> >> >>I > > >> >> >> > used > > >> >> >> > > >eclipse MAT to analyze the heap dump and found out the > > retained > > >> >> >>heap > > >> >> >> > size > > >> >> >> > > >of all RecordBatch objects were more than 500MB half of > which > > >> >>were > > >> >> >> used > > >> >> >> > to > > >> >> >> > > >retain data that were to send to destination broker which > > makes > > >> >> >>sense > > >> >> >> to > > >> >> >> > > >me > > >> >> >> > > >as it is close to 256MB producer buffer but the other half > of > > >> >>which > > >> >> >> were > > >> >> >> > > >used by > kafka.tools.MirrorMaker$MirrorMakerProducerCallback. > > As > > >> >> >>every > > >> >> >> > > >producer callback in mirror maker takes the message value > and > > >> >>hold > > >> >> >>it > > >> >> >> > > >until > > >> >> >> > > >the message is successfully delivered. In my case since the > > >> >> >> destination > > >> >> >> > > >broker was very unresponsive the message value held by > > callback > > >> >> >>would > > >> >> >> > stay > > >> >> >> > > >forever which I think is a waste and it is a major > > contributor > > >> >>to > > >> >> >>the > > >> >> >> > OOME > > >> >> >> > > >issue. screenshot of MAT > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> > > >> >> > > >> > > > https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-mat-screensh > > >> >> >>o > > >> >> >> > > >t-png > > >> >> >> > > > > > >> >> >> > > >The other interesting problem I observed is that when I > > turned > > >> >>on > > >> >> >> > > >unreachable object parsing in MAT more than 400MB memory > was > > >> >> >>occupied > > >> >> >> by > > >> >> >> > > >unreachable objects. It surprised me that gc didn't clean > > them > > >> >>up > > >> >> >> before > > >> >> >> > > >OOME was thrown. As suggested in gc log > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> > > >> >> > > >> > > > https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-gc-log- > > >> >> >>L > > >> >> >> > > >1 > > >> >> >> > > >Full GC was unable to reclaim any memory and when facing > OOME > > >> >>these > > >> >> >> > > >unreachable objects should have been cleaned up. so either > > >> >>eclipse > > >> >> >>MAT > > >> >> >> > has > > >> >> >> > > >issue parsing the heap dump or there is hidden memory leak > > that > > >> >>is > > >> >> >> hard > > >> >> >> > to > > >> >> >> > > >find. I attached the sample screenshot of the unreachable > > >> >>objects > > >> >> >>here > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> > > >> >> > > >> > > > https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-unreachable- > > >> >> >>o > > >> >> >> > > >bjects-png > > >> >> >> > > > > > >> >> >> > > >The consumer properties > > >> >> >> > > > > > >> >> >> > > >zookeeper.connect=zk > > >> >> >> > > >zookeeper.connection.timeout.ms=1000000 > > >> >> >> > > >group.id=mm > > >> >> >> > > >auto.offset.reset=smallest > > >> >> >> > > >partition.assignment.strategy=roundrobin > > >> >> >> > > > > > >> >> >> > > >The producer properties > > >> >> >> > > > > > >> >> >> > > >bootstrap.servers=brokers > > >> >> >> > > >client.id=mirror-producer > > >> >> >> > > >producer.type=async > > >> >> >> > > >compression.codec=none > > >> >> >> > > >serializer.class=kafka.serializer.DefaultEncoder > > >> >> >> > > > > >> >> >> > > >> >> > > >> >>>>>key.serializer=org.apache.kafka.common.serialization.By > > >> teArraySerializ > > >> >>>>>er > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> > > >> > > >> > > > >>>>>value.serializer=org.apache.kafka.common.serialization.ByteArraySerial > > >> >>>>>iz > > >> >> >>>er > > >> >> >> > > >buffer.memory=268435456 > > >> >> >> > > >batch.size=1048576 > > >> >> >> > > >max.request.size=5242880 > > >> >> >> > > >send.buffer.bytes=1048576 > > >> >> >> > > > > > >> >> >> > > >The java command to start mirror maker > > >> >> >> > > >java -Xmx1024M -Xms512M -XX:+HeapDumpOnOutOfMemoryError > > >> >> >> > > >-XX:HeapDumpPath=/home/kafka/slc-phx-mm-cg.hprof > > >> >> >> > > >-XX:+PrintTenuringDistribution -XX:MaxTenuringThreshold=3 > > >> >>-server > > >> >> >> > > >-XX:+UseParNewGC -XX:+UseConcMarkSweepGC > > >> >> >>-XX:+CMSClassUnloadingEnabled > > >> >> >> > > >-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC > > >> >> >> > > >-Djava.awt.headless=true > > >> >> >> > > >-Xloggc:/var/log/kafka/kafka-phx/cg/mirrormaker-gc.log > > >> >>-verbose:gc > > >> >> >> > > >-XX:+PrintGCDetails -XX:+PrintGCDateStamps > > >> >>-XX:+PrintGCTimeStamps > > >> >> >> > > >-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 > > >> >> >> > > >-XX:GCLogFileSize=10M -Dcom.sun.management.jmxremote > > >> >> >> > > >-Dcom.sun.management.jmxremote.authenticate=false > > >> >> >> > > >-Dcom.sun.management.jmxremote.ssl=false > > >> >> >> > > >-Dkafka.logs.dir=/var/log/kafka/kafka-phx/cg > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >>>-Dlog4j.configuration=file:/usr/share/kafka/bin/../config/ > > >> >> tools-log4j.pr > > >> >> >>>op > > >> >> >> > > >erties > > >> >> >> > > >-cp libs/* kafka.tools.MirrorMaker --consumer.config > > >> >> >> > > >consumer.properties --num.streams 10 --producer.config > > >> >> >> > > >producer.properties --whitelist test.* > > >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> -- > > >> >> >> -- Guozhang > > >> >> >> > > >> >> > > >> >> > > >> > > >> > > > > > > -- > -- Guozhang >