understand, thanks for all the help On Mon, Aug 27, 2018 at 2:16 PM Guozhang Wang <wangg...@gmail.com> wrote:
> Hello Nan, > > What you described seems to be a broker-side latency spike, not a client > (either producer, or consumer, or streams)-side latency spike. There's a > number of possible reasons for such spikes: disk flushing (though async, it > can also cause the processing thread to halt), GC, page faults (in case the > thread needs to access a cold page), etc. It is hard to tell which one is > the actual root cause. > > For example, you can take a look at this slides (starting at 14), for a > concrete example of such an investigation: > > https://www.slideshare.net/kawamuray/multitenancy-kafka-clusters-for-everyone-at-line > > My point is that it is not really easy via email discussion and by looking > at your experiment code to tell exactly what is the root cause: the > community can share with your some past experience and a few quick hinters, > but most likely the issue varies case by case and hence can only be fully > understandable by yourself. > > > Guozhang > > On Sat, Aug 25, 2018 at 6:58 PM, Nan Xu <nanxu1...@gmail.com> wrote: > > > maybe easier to use github. > > > > https://github.com/angelfox123/kperf > > > > > > On Sat, Aug 25, 2018 at 8:43 PM Nan Xu <nanxu1...@gmail.com> wrote: > > > > > so I did upgrade to 2.0.0 and still seeing the same result. below is > the > > > program I am using. I am running everything on a single server. > (centos > > 7, > > > 24 core, 32G ram , 1 broker, 1 zookeeper, single harddrive), I > understand > > > the single hard drive is less ideal. but still don't expect it can > over 3 > > > seconds. > > > > > > > > > case 1. > > > I create 1 parittions for input and 1 partition for output. message > size > > > 10K > > > producer give parameter (3600, 1000, 2) // 2 message per 1000 micro > > > second for 3600 seconds, that translate to 2,000 message/s, I still see > > > latency, sometime can reach to 3 seconds. > > > > > > case 2 > > > 50 partitions for input, and 50 partitions for output. message size 10K > > > producer give parameter (3600, 1000, 20) // 20 message per 1000 > micro > > > second for 3600 seconds, that translate to 20,000 message/s,latency not > > > only high, and happen more often. > > > > > > > > > Any suggestion is appreciated. target is per partition handle 1,000 -- > > > 2,000 message/s and all latency lower than 100ms. > > > > > > ====build.gradle====== > > > plugins { > > > id 'application' > > > id 'java' > > > } > > > group 'com.bofa' > > > version '1.0-SNAPSHOT' > > > sourceCompatibility = 1.8 > > > mainClassName="main.StreamApp" > > > > > > repositories { > > > mavenCentral() > > > } > > > > > > dependencies { > > > compile group: 'org.apache.kafka', name: 'kafka-clients', version: > > > '2.0.0' > > > compile group: "org.apache.kafka", name: "kafka-streams", version: > > > "2.0.0" > > > compile group: 'io.dropwizard.metrics', name: 'metrics-core', > > > version:'3.2.6' > > > testCompile group: 'junit', name: 'junit', version: '4.12' > > > } > > > > > > ========producer======== > > > package main; > > > > > > import java.util.Properties; > > > import java.util.concurrent.atomic.AtomicInteger; > > > > > > import Util.BusyTimer; > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > import org.apache.kafka.clients.producer.ProducerConfig; > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > import org.apache.kafka.common.serialization.Serde; > > > import org.apache.kafka.common.serialization.Serdes; > > > import org.apache.kafka.common.serialization.StringSerializer; > > > > > > public class SimpleProducer { > > > public static void main(String[] args) { > > > final int time =Integer.valueOf(args[0]); > > > final long interval = Integer.valueOf(args[1]); > > > final int batch =Integer.valueOf(args[2]); > > > Properties props = new Properties(); > > > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > > > "localhost:9092"); > > > props.put(ProducerConfig.CLIENT_ID_CONFIG, > > > "kafka-perf-test-producer"); > > > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > > > StringSerializer.class); > > > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > > StringSerializer.class); > > > > > > KafkaProducer<String,String> kafkaProducer = new > > > KafkaProducer(props); > > > > > > StringBuffer buffer = new StringBuffer(); > > > for(int i=0; i<10240; i++) { > > > buffer.append('a'); > > > } > > > String value = buffer.toString(); > > > > > > final long speed = 1000000/interval; > > > Runnable task = new Runnable() { > > > int sendNum=0; > > > @Override > > > public void run() { > > > > > > for(int i=0; i<batch; i++) { > > > ProducerRecord<String, String> record = new > > > ProducerRecord<>("input", System.nanoTime() + "-" + value); > > > kafkaProducer.send(record); > > > sendNum++; > > > } > > > > > > if(sendNum % (speed * batch) == 0){ > > > System.out.println(System.currentTimeMillis() + " : > > " > > > + sendNum); > > > } > > > } > > > }; > > > > > > BusyTimer timer = new BusyTimer(interval,time, task); > > > timer.spaceMessageWithInterval(); > > > } > > > } > > > > > > > > > ============kafka stream============= > > > package main; > > > > > > import java.util.Properties; > > > > > > import org.apache.kafka.clients.producer.ProducerConfig; > > > import org.apache.kafka.common.serialization.Serdes; > > > import org.apache.kafka.streams.Consumed; > > > import org.apache.kafka.streams.KafkaStreams; > > > import org.apache.kafka.streams.StreamsBuilder; > > > import org.apache.kafka.streams.StreamsConfig; > > > import org.apache.kafka.streams.kstream.KStream; > > > import org.apache.kafka.streams.kstream.Produced; > > > > > > public class StreamApp { > > > public static void main(String[] args) { > > > final Properties streamsConfiguration = new Properties(); > > > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > > > "simple-stream"); > > > streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, > > > "simple_stream_1"); > > > > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > "localhost:9092"); > > > > > > streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > > > Serdes.String() > > > .getClass().getName()); > > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > > + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, > 30); > > > streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX > > > + ProducerConfig.LINGER_MS_CONFIG,"5"); > > > > > > streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,2); > > > > > > > > > StreamsBuilder builder = new StreamsBuilder(); > > > final KStream<String, String> inputStream = builder.stream( > > > "input", > > > Consumed.with( > > > new Serdes.StringSerde(), > > > new Serdes.StringSerde() > > > ) > > > ); > > > > > > inputStream.to( > > > "output", > > > Produced.with(new Serdes.StringSerde(), new > > > Serdes.StringSerde()) > > > ); > > > > > > KafkaStreams streams = new KafkaStreams(builder.build(), > > > streamsConfiguration); > > > streams.start(); > > > } > > > } > > > > > > =============consumer============================ > > > package main; > > > > > > import java.util.Collections; > > > import java.util.Properties; > > > > > > import com.codahale.metrics.Reservoir; > > > import com.codahale.metrics.UniformReservoir; > > > import org.apache.kafka.clients.consumer.ConsumerConfig; > > > import org.apache.kafka.clients.consumer.ConsumerRecord; > > > import org.apache.kafka.clients.consumer.ConsumerRecords; > > > import org.apache.kafka.clients.consumer.KafkaConsumer; > > > import org.apache.kafka.common.serialization.StringDeserializer; > > > > > > public class SimpleConsumer { > > > public static void main(String[] args) { > > > int expectedSpeed = args[0]; > > > Properties props = new Properties(); > > > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > > > "localhost:9092"); > > > props.put(ConsumerConfig.GROUP_ID_CONFIG, > > > "kafka-perf-consumer-group"); > > > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > > StringDeserializer.class.getName()); > > > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); > > > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); > > > > > > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > > StringDeserializer.class.getName()); > > > > > > KafkaConsumer consumer = new KafkaConsumer<String, > > String>(props); > > > consumer.subscribe(Collections.singletonList("output")); > > > > > > consumer.poll(0); > > > int recNum=0; > > > > > > Reservoir totalRes = new UniformReservoir(); > > > > > > while (true) { > > > ConsumerRecords<String, String> records = > consumer.poll(10); > > > for(ConsumerRecord<String,String> record : records){ > > > long sendTime = Long.valueOf(record.value(). > > split("-")[0]); > > > long takeTime = System.nanoTime() - sendTime; > > > if(recNum> 20000) { > > > totalRes.update(takeTime); > > > } > > > recNum++; > > > > > > if(recNum % expectedSpeed == 0){ > > > System.out.println("==============="+ recNum + > > > "============"); > > > System.out.println(" mean: " + > > > totalRes.getSnapshot().getMean()/1000000); > > > System.out.println(" 75%: " + > > > totalRes.getSnapshot().get75thPercentile()/1000000); > > > System.out.println(" 99%: " + > > > totalRes.getSnapshot().get99thPercentile()/1000000); > > > System.out.println(" 99.9%: " + > > > totalRes.getSnapshot().get999thPercentile()/1000000); > > > System.out.println(" Max: " + > > > totalRes.getSnapshot().getMax()/1000000); > > > > > > System.out.println("========================================"); > > > totalRes = new UniformReservoir(); > > > } > > > }; > > > } > > > } > > > } > > > > > > ==========busy timer===================== > > > //idea is space the message at a fixed time.(as thread.sleep, but sleep > > is > > > less accurate) > > > package Util; > > > > > > import java.util.ArrayList; > > > import java.util.concurrent.ExecutorService; > > > import java.util.concurrent.Executors; > > > import java.util.concurrent.atomic.AtomicInteger; > > > import java.util.concurrent.atomic.AtomicLong; > > > > > > public class BusyTimer { > > > long interval; > > > long duration; > > > ArrayList<Long> pubTime; > > > ExecutorService ex = Executors.newSingleThreadExecutor(); > > > Runnable task; > > > > > > > > > public BusyTimer(long microInterval, long exDurationInSeconds, > > > Runnable task){ > > > pubTime = new ArrayList<Long>((int)(exDurationInSeconds * 1000 > * > > > 1000 / microInterval+1)); > > > > > > this.interval = microInterval * 1000; > > > this.duration = exDurationInSeconds * 1000000000; > > > this.task = task; > > > > > > } > > > > > > private void busywaitUntil(long nano){ > > > while(System.nanoTime() < nano){ > > > > > > } > > > } > > > > > > public void spaceMessageWithInterval(){ > > > int i =0 ; > > > long baseTime = System.nanoTime(); > > > long doneTime = baseTime + duration; > > > while(true) { > > > task.run(); > > > pubTime.add(System.nanoTime()); > > > long targetTime = System.nanoTime() + interval; > > > if(System.nanoTime() > doneTime ){ > > > break; > > > } > > > busywaitUntil(targetTime); > > > } > > > } > > > } > > > > > > > > > > > > On Fri, Aug 24, 2018 at 3:37 PM Nan Xu <nanxu1...@gmail.com> wrote: > > > > > >> Looks really promising but after upgrade, still show the same result. > I > > >> will post the program soon. Maybe you can see where the problem could > > be. > > >> > > >> Nan > > >> > > >> On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > >> > > >>> Hello Nan, > > >>> > > >>> Kafka does not tie up the processing thread to do disk flushing. > > However, > > >>> since you are on an older version of Kafka I suspect you're bumping > > into > > >>> some old issues that have been resolved in later versions. e.g. > > >>> > > >>> https://issues.apache.org/jira/browse/KAFKA-4614 > > >>> > > >>> I'd suggest you upgrading to latest version (2.0.0) and try again to > > see > > >>> if > > >>> you observe the same pattern. > > >>> > > >>> Guozhang > > >>> > > >>> On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni < > > >>> sbpothin...@gmail.com> wrote: > > >>> > > >>> > I will wait for the expert’s opinion: > > >>> > > > >>> > Did the Transparent Huge Pages(THP) disabled on the broker machine? > > >>> it’s a > > >>> > Linux kernel parameter. > > >>> > > > >>> > -Sudhir > > >>> > > > >>> > > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote: > > >>> > > > > >>> > > I think I found where the problem is, how to solve and why, still > > not > > >>> > sure. > > >>> > > > > >>> > > it related to disk (maybe flushing?). I did a single machine, > > single > > >>> > node, > > >>> > > single topic and single partition setup. producer pub as 2000 > > >>> message/s, > > >>> > > 10K size message size. and single key. > > >>> > > > > >>> > > when I save kafka log to the memory based partition, I don't > see a > > >>> > latency > > >>> > > over 100ms. top around 70ms. > > >>> > > when I save to a ssd hard drive. I do see latency spike, sometime > > >>> over > > >>> > 1s. > > >>> > > > > >>> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has > > >>> impact, > > >>> > > but only to make thing worse... need suggestion. > > >>> > > > > >>> > > I think log flushing is totally async and done by OS in the > default > > >>> > > setting. does kafka has to wait when flushing data to disk? > > >>> > > > > >>> > > Thanks, > > >>> > > Nan > > >>> > > > > >>> > > > > >>> > > > > >>> > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang < > > wangg...@gmail.com> > > >>> > wrote: > > >>> > >> > > >>> > >> Given your application code: > > >>> > >> > > >>> > >> ---------------------------- > > >>> > >> > > >>> > >> final KStream<String, NodeMutation> localDeltaStream = > > >>> builder.stream( > > >>> > >> > > >>> > >> localDeltaTopic, > > >>> > >> > > >>> > >> Consumed.with( > > >>> > >> > > >>> > >> new Serdes.StringSerde(), > > >>> > >> > > >>> > >> new NodeMutationSerde<>() > > >>> > >> > > >>> > >> ) > > >>> > >> > > >>> > >> ); > > >>> > >> > > >>> > >> KStream<String, NodeState> localHistStream = > > >>> > localDeltaStream.mapValues( > > >>> > >> > > >>> > >> (mutation) -> NodeState > > >>> > >> > > >>> > >> .newBuilder() > > >>> > >> > > >>> > >> .setMeta( > > >>> > >> > > >>> > >> mutation.getMetaMutation().getMeta() > > >>> > >> > > >>> > >> ) > > >>> > >> > > >>> > >> .setValue( > > >>> > >> > > >>> > >> mutation.getValueMutation().getValue() > > >>> > >> > > >>> > >> ) > > >>> > >> > > >>> > >> .build() > > >>> > >> > > >>> > >> ); > > >>> > >> > > >>> > >> localHistStream.to( > > >>> > >> > > >>> > >> localHistTopic, > > >>> > >> > > >>> > >> Produced.with(new Serdes.StringSerde(), new > > >>> > NodeStateSerde<>()) > > >>> > >> > > >>> > >> ); > > >>> > >> > > >>> > >> ---------------------------- > > >>> > >> > > >>> > >> which is pure stateless, committing will not touch on an state > > >>> > directory at > > >>> > >> all. Hence committing only involves committing offsets to Kafka. > > >>> > >> > > >>> > >> > > >>> > >> Guozhang > > >>> > >> > > >>> > >> > > >>> > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> > > >>> wrote: > > >>> > >>> > > >>> > >>> I was suspecting that too, but I also noticed the spike is not > > >>> spaced > > >>> > >>> around 10s. to further prove it. I put kafka data directory in > a > > >>> memory > > >>> > >>> based directory. it still has such latency spikes. I am going > > to > > >>> test > > >>> > >> it > > >>> > >>> on a single broker, single partition env. will report back > soon. > > >>> > >>> > > >>> > >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang < > > wangg...@gmail.com> > > >>> > >> wrote: > > >>> > >>> > > >>> > >>>> Hello Nan, > > >>> > >>>> > > >>> > >>>> Thanks for the detailed information you shared. When Kafka > > >>> Streams is > > >>> > >>>> normally running, no rebalances should be triggered unless > some > > >>> of the > > >>> > >>>> instances (in your case, docker containers) have soft > failures. > > >>> > >>>> > > >>> > >>>> I suspect the latency spike is due to the commit intervals: > > >>> streams > > >>> > >> will > > >>> > >>>> try to commit its offset at a regular paces, which may > increase > > >>> > >> latency. > > >>> > >>> It > > >>> > >>>> is controlled by the "commit.interval.ms" config value. I saw > > >>> that in > > >>> > >>> your > > >>> > >>>> original email you've set it to 10 * 1000 (10 seconds). Is > that > > >>> > aligned > > >>> > >>>> with the frequency you observe latency spikes? > > >>> > >>>> > > >>> > >>>> > > >>> > >>>> Guozhang > > >>> > >>>> > > >>> > >>>> > > >>> > >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu < > nanxu1...@gmail.com> > > >>> > wrote: > > >>> > >>>>> > > >>> > >>>>> did more test and and make the test case simple. > > >>> > >>>>> all the setup now is a single physical machine. running 3 > > docker > > >>> > >>>> instance. > > >>> > >>>>> a1, a2, a3 > > >>> > >>>>> > > >>> > >>>>> kafka + zookeeper running on all of those docker containers. > > >>> > >>>>> producer running on a1, send a single key, update speed 2000 > > >>> > >>> message/s, > > >>> > >>>>> each message is 10K size. > > >>> > >>>>> 3 consumer(different group) are running. one on each docker. > > >>> > >>>>> all topics are pre-created. > > >>> > >>>>> in startup, I do see some latency greater than 100ms, which > is > > >>> fine. > > >>> > >>> and > > >>> > >>>>> then everything is good. latency is low and consumer don't > see > > >>> > >> anything > > >>> > >>>>> over 100ms for a while. > > >>> > >>>>> then I see a few messages have latency over 100ms. then back > to > > >>> > >> normal, > > >>> > >>>>> then happen again..... do seems like gc problem. but I check > > the > > >>> gc > > >>> > >>>> log. I > > >>> > >>>>> don't think it can cause over 100ms. (both are G1 collector) > > >>> > >>>>> > > >>> > >>>>> after the stream stable running( exclude the startup), the > > first > > >>> > >>> message > > >>> > >>>>> over 100ms take 179ms and the gc ( it has a 30ms pause, but > > >>> should > > >>> > >> not > > >>> > >>>>> cause a 179ms end to end). > > >>> > >>>>> > > >>> > >>>>> FROM APP > > >>> > >>>>> > > >>> > >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation > Failure) > > >>> > >>>>> 3184739K->84018K(5947904K), 0.0093730 secs] > > >>> > >>>>> > > >>> > >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation > Failure) > > >>> > >>>>> 3184690K->84280K(6053888K), 0.0087473 secs] > > >>> > >>>>> > > >>> > >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation > Failure) > > >>> > >>>>> 3301176K->84342K(6061056K), 0.0127339 secs] > > >>> > >>>>> > > >>> > >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation > Failure) > > >>> > >>>>> 3301238K->84624K(6143488K), 0.0140844 secs] > > >>> > >>>>> > > >>> > >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation > Failure) > > >>> > >>>>> 3386000K->89949K(6144000K), 0.0108118 secs] > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> kafka a1 > > >>> > >>>>> > > >>> > >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 > > Evacuation > > >>> > >> Pause) > > >>> > >>>>> (young), 0.0214200 secs] > > >>> > >>>>> > > >>> > >>>>> [Parallel Time: 17.2 ms, GC Workers: 8] > > >>> > >>>>> > > >>> > >>>>> [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, > > Max: > > >>> > >>>>> 7982673.8, Diff: 16.3] > > >>> > >>>>> > > >>> > >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, > > Diff: > > >>> > >> 1.5, > > >>> > >>>>> Sum: 1.5] > > >>> > >>>>> > > >>> > >>>>> [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: > 6.5, > > >>> Sum: > > >>> > >>> 8.4] > > >>> > >>>>> > > >>> > >>>>> [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: > > 13, > > >>> > >> Sum: > > >>> > >>>> 37] > > >>> > >>>>> > > >>> > >>>>> [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, > > Sum: > > >>> > >> 7.1] > > >>> > >>>>> > > >>> > >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, > > >>> Diff: > > >>> > >>> 0.0, > > >>> > >>>>> Sum: 0.0] > > >>> > >>>>> > > >>> > >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: > > 6.5, > > >>> > >> Sum: > > >>> > >>>>> 36.5] > > >>> > >>>>> > > >>> > >>>>> [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: > > 0.9, > > >>> > >> Sum: > > >>> > >>>> 2.9] > > >>> > >>>>> > > >>> > >>>>> [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, > > Diff: > > >>> 24, > > >>> > >>>> Sum: > > >>> > >>>>> 83] > > >>> > >>>>> > > >>> > >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, > Diff: > > >>> 0.0, > > >>> > >>>> Sum: > > >>> > >>>>> 0.1] > > >>> > >>>>> > > >>> > >>>>> [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, > > Diff: > > >>> > >> 16.2, > > >>> > >>>>> Sum: 56.5] > > >>> > >>>>> > > >>> > >>>>> [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, > Max: > > >>> > >>>> 7982674.5, > > >>> > >>>>> Diff: 0.6] > > >>> > >>>>> > > >>> > >>>>> [Code Root Fixup: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Code Root Purge: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Clear CT: 1.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Other: 3.2 ms] > > >>> > >>>>> > > >>> > >>>>> [Choose CSet: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Ref Proc: 1.9 ms] > > >>> > >>>>> > > >>> > >>>>> [Ref Enq: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Redirty Cards: 0.8 ms] > > >>> > >>>>> > > >>> > >>>>> [Humongous Register: 0.1 ms] > > >>> > >>>>> > > >>> > >>>>> [Humongous Reclaim: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Free CSet: 0.2 ms] > > >>> > >>>>> > > >>> > >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: > 3072.0K->3072.0K > > >>> Heap: > > >>> > >>>>> 265.5M(1024.0M)->217.9M(1024.0M)] > > >>> > >>>>> > > >>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] > > >>> > >>>>> > > >>> > >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 > > Evacuation > > >>> > >> Pause) > > >>> > >>>>> (young), 0.0310004 secs] > > >>> > >>>>> > > >>> > >>>>> [Parallel Time: 24.4 ms, GC Workers: 8] > > >>> > >>>>> > > >>> > >>>>> [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, > > Max: > > >>> > >>>>> 7984444.7, Diff: 18.6] > > >>> > >>>>> > > >>> > >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, > > Diff: > > >>> > >> 1.9, > > >>> > >>>>> Sum: 2.0] > > >>> > >>>>> > > >>> > >>>>> [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: > > 11.8, > > >>> > >> Sum: > > >>> > >>>>> 32.9] > > >>> > >>>>> > > >>> > >>>>> [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: > > 25, > > >>> > >> Sum: > > >>> > >>>> 43] > > >>> > >>>>> > > >>> > >>>>> [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: > 11.2, > > >>> Sum: > > >>> > >>>> 25.5] > > >>> > >>>>> > > >>> > >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, > > >>> Diff: > > >>> > >>> 0.0, > > >>> > >>>>> Sum: 0.0] > > >>> > >>>>> > > >>> > >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: > > 6.9, > > >>> > >> Sum: > > >>> > >>>>> 32.7] > > >>> > >>>>> > > >>> > >>>>> [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: > > 1.6, > > >>> > >> Sum: > > >>> > >>>> 6.8] > > >>> > >>>>> > > >>> > >>>>> [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, > Diff: > > >>> 10, > > >>> > >>> Sum: > > >>> > >>>>> 43] > > >>> > >>>>> > > >>> > >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, > Diff: > > >>> 0.0, > > >>> > >>>> Sum: > > >>> > >>>>> 0.1] > > >>> > >>>>> > > >>> > >>>>> [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, > > Diff: > > >>> > >>> 19.1, > > >>> > >>>>> Sum: 100.1] > > >>> > >>>>> > > >>> > >>>>> [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, > Max: > > >>> > >>>> 7984449.9, > > >>> > >>>>> Diff: 0.8] > > >>> > >>>>> > > >>> > >>>>> [Code Root Fixup: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Code Root Purge: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Clear CT: 1.1 ms] > > >>> > >>>>> > > >>> > >>>>> [Other: 5.5 ms] > > >>> > >>>>> > > >>> > >>>>> [Choose CSet: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Ref Proc: 2.2 ms] > > >>> > >>>>> > > >>> > >>>>> [Ref Enq: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Redirty Cards: 2.8 ms] > > >>> > >>>>> > > >>> > >>>>> [Humongous Register: 0.1 ms] > > >>> > >>>>> > > >>> > >>>>> [Humongous Reclaim: 0.0 ms] > > >>> > >>>>> > > >>> > >>>>> [Free CSet: 0.1 ms] > > >>> > >>>>> > > >>> > >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: > 3072.0K->3072.0K > > >>> Heap: > > >>> > >>>>> 265.9M(1024.0M)->218.4M(1024.0M)] > > >>> > >>>>> > > >>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> so when kafka stream running, is there any trying to > rebalance? > > >>> > >> either > > >>> > >>>>> broker rebalance or client rebalance? > > >>> > >>>>> any kind of test to see what cause the trouble? > > >>> > >>>>> > > >>> > >>>>> Thanks, > > >>> > >>>>> Nan > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> > > >>> > >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang < > > >>> wangg...@gmail.com> > > >>> > >>>> wrote: > > >>> > >>>>> > > >>> > >>>>>> Okay, so you're measuring end-to-end time from producer -> > > >>> broker > > >>> > >> -> > > >>> > >>>>>> streams' consumer client, there are multiple phases that can > > >>> > >>> contribute > > >>> > >>>>> to > > >>> > >>>>>> the 100ms latency, and I cannot tell if stream's consumer > > phase > > >>> is > > >>> > >>> the > > >>> > >>>>>> major contributor. For example, if the topic was not created > > >>> > >> before, > > >>> > >>>> then > > >>> > >>>>>> when the broker first received a produce request it may need > > to > > >>> > >>> create > > >>> > >>>>> the > > >>> > >>>>>> topic, which involves multiple steps including writes to ZK > > >>> which > > >>> > >>> could > > >>> > >>>>>> take time. > > >>> > >>>>>> > > >>> > >>>>>> There are some confusions from your description: you > mentioned > > >>> > >> "Kafka > > >>> > >>>>>> cluster is already up and running", but I think you are > > >>> referring > > >>> > >> to > > >>> > >>>>> "Kafka > > >>> > >>>>>> Streams application instances are already up and running", > > >>> right? > > >>> > >>> Since > > >>> > >>>>>> only the latter has rebalance process, while the Kafak > brokers > > >>> do > > >>> > >> not > > >>> > >>>>>> really have "rebalances" except balancing load by migrating > > >>> > >>> partitions. > > >>> > >>>>>> > > >>> > >>>>>> Guozhang > > >>> > >>>>>> > > >>> > >>>>>> > > >>> > >>>>>> > > >>> > >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu < > nanxu1...@gmail.com> > > >>> > >> wrote: > > >>> > >>>>>> > > >>> > >>>>>>> right, so my kafka cluster is already up and running for a > > >>> while, > > >>> > >>>> and I > > >>> > >>>>>> can > > >>> > >>>>>>> see from the log all broker instance already change from > > >>> > >> rebalance > > >>> > >>> to > > >>> > >>>>>>> running. > > >>> > >>>>>>> > > >>> > >>>>>>> I did a another test. > > >>> > >>>>>>> from producer, right before the message get send to the > > >>> broker, I > > >>> > >>>> put a > > >>> > >>>>>>> timestamp in the message. and from the consumer side which > is > > >>> > >> after > > >>> > >>>>>> stream > > >>> > >>>>>>> processing, I compare this timestamp with current time. I > can > > >>> see > > >>> > >>>> some > > >>> > >>>>>>> message processing time is above 100ms on some real > powerful > > >>> > >>>> hardware. > > >>> > >>>>>> and > > >>> > >>>>>>> from my application gc, all the gc time is below 1ms, kafka > > gc > > >>> > >> only > > >>> > >>>>>> happen > > >>> > >>>>>>> once and below 1ms too. > > >>> > >>>>>>> > > >>> > >>>>>>> very puzzled. is there any communication to zookeeper, if > not > > >>> get > > >>> > >>>>>> response, > > >>> > >>>>>>> will cause the broker to pause? I don't think that's the > case > > >>> but > > >>> > >>> at > > >>> > >>>>> this > > >>> > >>>>>>> time don't know what else can be suspected. > > >>> > >>>>>>> > > >>> > >>>>>>> Nan > > >>> > >>>>>>> > > >>> > >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang < > > >>> > >> wangg...@gmail.com> > > >>> > >>>>>> wrote: > > >>> > >>>>>>> > > >>> > >>>>>>>> Hello Nan, > > >>> > >>>>>>>> > > >>> > >>>>>>>> Note that Streams may need some time to rebalance and > assign > > >>> > >>> tasks > > >>> > >>>>> even > > >>> > >>>>>>> if > > >>> > >>>>>>>> you only starts with one instance. > > >>> > >>>>>>>> > > >>> > >>>>>>>> I'd suggest you register your state listener in Kafka > > Streams > > >>> > >> via > > >>> > >>>>>>>> KafkaStreams#setStateListener, and your customized > > >>> > >> StateListener > > >>> > >>>>> should > > >>> > >>>>>>>> record when the state transits from REBALANCING to RUNNING > > >>> > >> since > > >>> > >>>> only > > >>> > >>>>>>> after > > >>> > >>>>>>>> that the streams client will start to process the first > > >>> record. > > >>> > >>>>>>>> > > >>> > >>>>>>>> > > >>> > >>>>>>>> Guozhang > > >>> > >>>>>>>> > > >>> > >>>>>>>> > > >>> > >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu < > > nanxu1...@gmail.com> > > >>> > >>>> wrote: > > >>> > >>>>>>>> > > >>> > >>>>>>>>> thanks, which JMX properties indicate "processing > latency > > >>> > >>>>> spikes" / > > >>> > >>>>>>>>> "throughput" > > >>> > >>>>>>>>> > > >>> > >>>>>>>>> > > >>> > >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax < > > >>> > >>>>>> matth...@confluent.io > > >>> > >>>>>>>> > > >>> > >>>>>>>>> wrote: > > >>> > >>>>>>>>> > > >>> > >>>>>>>>>> I cannot spot any obvious reasons. > > >>> > >>>>>>>>>> > > >>> > >>>>>>>>>> As you consume from the result topic for verification, > we > > >>> > >>>> should > > >>> > >>>>>>> verify > > >>> > >>>>>>>>>> that the latency spikes original on write and not on > read: > > >>> > >>> you > > >>> > >>>>>> might > > >>> > >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see > > if > > >>> > >>>>>>> processing > > >>> > >>>>>>>>>> latency spikes or throughput drops. > > >>> > >>>>>>>>>> > > >>> > >>>>>>>>>> Also watch for GC pauses in the JVM. > > >>> > >>>>>>>>>> > > >>> > >>>>>>>>>> Hope this helps. > > >>> > >>>>>>>>>> > > >>> > >>>>>>>>>> > > >>> > >>>>>>>>>> -Matthias > > >>> > >>>>>>>>>> > > >>> > >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote: > > >>> > >>>>>>>>>>> btw, I am using version 0.10.2.0 > > >>> > >>>>>>>>>>> > > >>> > >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu < > > >>> > >>> nanxu1...@gmail.com> > > >>> > >>>>>>> wrote: > > >>> > >>>>>>>>>>> > > >>> > >>>>>>>>>>>> I am working on a kafka stream app, and see huge > latency > > >>> > >>>>>> variance, > > >>> > >>>>>>>>>>>> wondering what can cause this? > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> the processing is very simple and don't have state, > > >>> > >>>> linger.ms > > >>> > >>>>>>>> already > > >>> > >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and > > >>> > >>>>> published > > >>> > >>>>>>> as > > >>> > >>>>>>>>> 2000 > > >>> > >>>>>>>>>>>> messages/s, network is 10G. using a regular consumer > > >>> > >>> watch > > >>> > >>>>> the > > >>> > >>>>>>>>>>>> localHistTopic topic and just every 2000 message > print > > >>> > >>> out > > >>> > >>>> a > > >>> > >>>>>>>> counter, > > >>> > >>>>>>>>>> it > > >>> > >>>>>>>>>>>> usually every second I get a count 2000 as the publish > > >>> > >>>> speed, > > >>> > >>>>>> but > > >>> > >>>>>>>>>> sometime > > >>> > >>>>>>>>>>>> I see it stall for 3 or more seconds and then print > out > > >>> > >> a > > >>> > >>>> few > > >>> > >>>>>>> count. > > >>> > >>>>>>>>>> like > > >>> > >>>>>>>>>>>> cpu is paused during that time or message being > > >>> > >>> cache/batch > > >>> > >>>>> then > > >>> > >>>>>>>>>> processed. > > >>> > >>>>>>>>>>>> any suggestion? > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> final Properties streamsConfiguration = new > > >>> > >>> Properties(); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_ > > CONFIG, > > >>> > >>>>>>>>>>>> applicationId); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_ > > >>> > >>>>> CONFIG, > > >>> > >>>>>>>>>> clientId); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_ > > >>> > >>>>>>>>> SERVERS_CONFIG, > > >>> > >>>>>>>>>>>> bootstrapServers); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_ > > >>> > >>>>>>>>> SERDE_CLASS_CONFIG, > > >>> > >>>>>>>>>>>> Serdes.String() > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> .getClass().getName()); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ > > >>> > >>>>> MS_CONFIG, > > >>> > >>>>>>>>>>>> 10 * 1000); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> // > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>> > > >>> > >>>>>>>> > > >>> > >>>>>> > > >>> > >>>> > > >>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > > >>> > BUFFERING_CONFIG, > > >>> > >>>>>>>>> 0); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, > > >>> > >>>>> 335544320); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ > > >>> > >>>>>>> REQUESTS_PER_CONNECTION, > > >>> > >>>>>>>>> 30); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.consumerPrefix( > > >>> > >>>>>>>>>>>> ConsumerConfig.MAX_PARTITION_ > > >>> > >>>>> FETCH_BYTES_CONFIG),20 > > >>> > >>>>>> * > > >>> > >>>>>>>>> 1024 * > > >>> > >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> // > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>> > > >>> > >>>>>>>> > > >>> > >>>>>> > > >>> > >>>> > > >>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > > >>> > BUFFERING_CONFIG, > > >>> > >>>>>>>>> 0); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, > > >>> > >>>>> 335544320); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ > > >>> > >>>>>>> REQUESTS_PER_CONNECTION, > > >>> > >>>>>>>>> 30); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streamsConfiguration.put( > > >>> > >>>>> StreamsConfig.consumerPrefix( > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_ > > >>> > >>> CONFIG > > >>> > >>>>> , > > >>> > >>>>>>> 20 * > > >>> > >>>>>>>>>> 1024 * > > >>> > >>>>>>>>>>>> 1024); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder(); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream = > > >>> > >>>>>>>>> builder.stream( > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> localDeltaTopic, > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> Consumed.with( > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> new Serdes.StringSerde(), > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> new NodeMutationSerde<>() > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> ) > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> ); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> KStream<String, NodeState> localHistStream = > > >>> > >>>>>>>>>> localDeltaStream.mapValues( > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> (mutation) -> NodeState > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> .newBuilder() > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> .setMeta( > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > mutation.getMetaMutation().getMeta() > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> ) > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> .setValue( > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> mutation.getValueMutation(). > > >>> > >>> getValue() > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> ) > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> .build() > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> ); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> localHistStream.to( > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> localHistTopic, > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> Produced.with(new Serdes.StringSerde(), new > > >>> > >>>>>>>>>> NodeStateSerde<>()) > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> ); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streams = new KafkaStreams(builder.build(), > > >>> > >>>>>>> streamsConfiguration); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streams.cleanUp(); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> streams.start(); > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>>> > > >>> > >>>>>>>>>>> > > >>> > >>>>>>>>>> > > >>> > >>>>>>>>>> > > >>> > >>>>>>>>> > > >>> > >>>>>>>> > > >>> > >>>>>>>> > > >>> > >>>>>>>> > > >>> > >>>>>>>> -- > > >>> > >>>>>>>> -- Guozhang > > >>> > >>>>>>>> > > >>> > >>>>>>> > > >>> > >>>>>> > > >>> > >>>>>> > > >>> > >>>>>> > > >>> > >>>>>> -- > > >>> > >>>>>> -- Guozhang > > >>> > >>>>>> > > >>> > >>>>> > > >>> > >>>> > > >>> > >>>> > > >>> > >>>> > > >>> > >>>> -- > > >>> > >>>> -- Guozhang > > >>> > >>>> > > >>> > >>> > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> -- > > >>> > >> -- Guozhang > > >>> > >> > > >>> > > > >>> > > >>> > > >>> > > >>> -- > > >>> -- Guozhang > > >>> > > >> > > > > > > -- > -- Guozhang >