understand, thanks for all the help

On Mon, Aug 27, 2018 at 2:16 PM Guozhang Wang <wangg...@gmail.com> wrote:

Hello Nan,
> What you described seems to be a broker-side latency spike, not a client
> (either producer, or consumer, or streams)-side latency spike. There's a
> number of possible reasons for such spikes: disk flushing (though async, it
> can also cause the processing thread to halt), GC, page faults (in case the
> thread needs to access a cold page), etc. It is hard to tell which one is
> the actual root cause.
> For example, you can take a look at this slides (starting at 14), for a
> concrete example of such an investigation:
> https://www.slideshare.net/kawamuray/multitenancy-kafka-clusters-for-everyone-at-line
> My point is that it is not really easy via email discussion and by looking
> at your experiment code to tell exactly what is the root cause: the
> community can share with your some past experience and a few quick hinters,
> but most likely the issue varies case by case and hence can only be fully
> understandable by yourself.
> Guozhang
On Sat, Aug 25, 2018 at 6:58 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> > maybe easier to use github.
> >
> >  https://github.com/angelfox123/kperf
> >
> >
On Sat, Aug 25, 2018 at 8:43 PM Nan Xu <nanxu1...@gmail.com> wrote:
> >
> > > so I did upgrade to 2.0.0 and still seeing the same result. below is
> the
> > > program I am using.  I am running everything on a single server.
> (centos
> > 7,
> > > 24 core, 32G ram , 1 broker, 1 zookeeper, single harddrive), I
> understand
> > > the single hard drive is less ideal. but still don't expect it can
> over 3
> > > seconds.
> > >
> > >
> > > case 1.
> > > I create 1 parittions for input and  1 partition for output. message
> size
> > > 10K
> > > producer give parameter  (3600, 1000, 2)   // 2 message per 1000 micro
> > > second for 3600 seconds, that translate to 2,000 message/s, I still see
> > > latency, sometime can reach to 3 seconds.
> > >
> > > case 2
> > > 50 partitions for input, and 50 partitions for output. message size 10K
> > > producer give parameter  (3600, 1000, 20)   // 20 message per 1000
> micro
> > > second for 3600 seconds, that translate to 20,000 message/s,latency not
> > > only high, and happen more often.
> > >
> > >
> > > Any suggestion is appreciated. target is per partition handle 1,000 --
> > > 2,000 message/s and all latency lower than 100ms.
> > >
> > > ====build.gradle======
> > > plugins {
> > >     id 'application'
> > >     id 'java'
> > > }
> > > group 'com.bofa'
> > > version '1.0-SNAPSHOT'
> > > sourceCompatibility = 1.8
> > > mainClassName="main.StreamApp"
> > >
> > > repositories {
> > >     mavenCentral()
> > > }
> > >
> > > dependencies {
> > >     compile group: 'org.apache.kafka', name: 'kafka-clients', version:
> > > '2.0.0'
> > >     compile group: "org.apache.kafka", name: "kafka-streams", version:
> > > "2.0.0"
> > >     compile group: 'io.dropwizard.metrics', name: 'metrics-core',
> > > version:'3.2.6'
> > >     testCompile group: 'junit', name: 'junit', version: '4.12'
> > > }
> > >
> > > ========producer========
> > > package main;
> > >
> > > import java.util.Properties;
> > > import java.util.concurrent.atomic.AtomicInteger;
> > >
> > > import Util.BusyTimer;
> > > import org.apache.kafka.clients.producer.KafkaProducer;
> > > import org.apache.kafka.clients.producer.ProducerConfig;
> > > import org.apache.kafka.clients.producer.ProducerRecord;
> > > import org.apache.kafka.common.serialization.Serde;
> > > import org.apache.kafka.common.serialization.Serdes;
> > > import org.apache.kafka.common.serialization.StringSerializer;
> > >
> > > public class SimpleProducer {
> > >     public static void main(String[] args) {
> > > final int time =Integer.valueOf(args[0]);
> > > final long interval = Integer.valueOf(args[1]);
> > > final int batch =Integer.valueOf(args[2]);
> > >         Properties props = new Properties();
> > >         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > >         props.put(ProducerConfig.CLIENT_ID_CONFIG,
> > > "kafka-perf-test-producer");
> > >         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > > StringSerializer.class);
> > >         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > > StringSerializer.class);
> > >
> > >         KafkaProducer<String,String> kafkaProducer = new
> > > KafkaProducer(props);
> > >
> > >         StringBuffer buffer = new StringBuffer();
> > >         for(int i=0; i<10240; i++) {
> > >             buffer.append('a');
> > >         }
> > >         String value = buffer.toString();
> > >
> > >         final long speed = 1000000/interval;
> > >         Runnable task = new Runnable() {
> > >             int sendNum=0;
> > >             @Override
> > >             public void run() {
> > >
> > >                 for(int i=0; i<batch; i++) {
> > >                     ProducerRecord<String, String> record = new
> > > ProducerRecord<>("input",  System.nanoTime() + "-" + value);
> > >                     kafkaProducer.send(record);
> > >                     sendNum++;
> > >                 }
> > >
> > >                 if(sendNum % (speed * batch) == 0){
> > >                     System.out.println(System.currentTimeMillis() + " :
> > "
> > > + sendNum);
> > >                 }
> > >             }
> > >         };
> > >
> > >         BusyTimer timer = new BusyTimer(interval,time, task);
> > >         timer.spaceMessageWithInterval();
> > >     }
> > > }
> > >
> > >
> > > ============kafka stream=============
> > > package main;
> > >
> > > import java.util.Properties;
> > >
> > > import org.apache.kafka.clients.producer.ProducerConfig;
> > > import org.apache.kafka.common.serialization.Serdes;
> > > import org.apache.kafka.streams.Consumed;
> > > import org.apache.kafka.streams.KafkaStreams;
> > > import org.apache.kafka.streams.StreamsBuilder;
> > > import org.apache.kafka.streams.StreamsConfig;
> > > import org.apache.kafka.streams.kstream.KStream;
> > > import org.apache.kafka.streams.kstream.Produced;
> > >
> > > public class StreamApp {
> > >     public static void main(String[] args) {
> > >         final Properties streamsConfiguration = new Properties();
> > >         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > "simple-stream");
> > >         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
> > > "simple_stream_1");
> > >
>  streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > >
> > > streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> > > Serdes.String()
> > >             .getClass().getName());
> > >         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> 30);
> > >         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > >
> > > streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,2);
> > >
> > >
> > >         StreamsBuilder builder = new StreamsBuilder();
> > >         final KStream<String, String> inputStream = builder.stream(
> > >             "input",
> > >             Consumed.with(
> > >                 new Serdes.StringSerde(),
> > >                 new Serdes.StringSerde()
> > >             )
> > >         );
> > >
> > >         inputStream.to(
> > >             "output",
> > >             Produced.with(new Serdes.StringSerde(), new
> > > Serdes.StringSerde())
> > >         );
> > >
> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
> > > streamsConfiguration);
> > >         streams.start();
> > >     }
> > > }
> > >
> > > =============consumer============================
> > > package main;
> > >
> > > import java.util.Collections;
> > > import java.util.Properties;
> > >
> > > import com.codahale.metrics.Reservoir;
> > > import com.codahale.metrics.UniformReservoir;
> > > import org.apache.kafka.clients.consumer.ConsumerConfig;
> > > import org.apache.kafka.clients.consumer.ConsumerRecord;
> > > import org.apache.kafka.clients.consumer.ConsumerRecords;
> > > import org.apache.kafka.clients.consumer.KafkaConsumer;
> > > import org.apache.kafka.common.serialization.StringDeserializer;
> > >
> > > public class SimpleConsumer {
> > >     public static void main(String[] args) {
> > > int expectedSpeed = args[0];
> > >         Properties props = new Properties();
> > >         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > >         props.put(ConsumerConfig.GROUP_ID_CONFIG,
> > > "kafka-perf-consumer-group");
> > >         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > > StringDeserializer.class.getName());
> > >         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
> > >         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
> > >
> > >         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > > StringDeserializer.class.getName());
> > >
> > >         KafkaConsumer consumer = new KafkaConsumer<String,
> > String>(props);
> > >         consumer.subscribe(Collections.singletonList("output"));
> > >
> > >         consumer.poll(0);
> > >         int recNum=0;
> > >
> > >         Reservoir totalRes = new UniformReservoir();
> > >
> > >         while (true) {
> > >             ConsumerRecords<String, String> records =
> consumer.poll(10);
> > >             for(ConsumerRecord<String,String> record : records){
> > >                 long sendTime = Long.valueOf(record.value().
> > split("-")[0]);
> > >                 long takeTime = System.nanoTime() - sendTime;
> > >                 if(recNum> 20000) {
> > >                     totalRes.update(takeTime);
> > >                 }
> > >                 recNum++;
> > >
> > >                 if(recNum % expectedSpeed == 0){
> > >                     System.out.println("==============="+ recNum +
> > > "============");
> > >                     System.out.println("  mean: " +
> > > totalRes.getSnapshot().getMean()/1000000);
> > >                     System.out.println("  75%: " +
> > > totalRes.getSnapshot().get75thPercentile()/1000000);
> > >                     System.out.println("  99%: " +
> > > totalRes.getSnapshot().get99thPercentile()/1000000);
> > >                     System.out.println("  99.9%: " +
> > > totalRes.getSnapshot().get999thPercentile()/1000000);
> > >                     System.out.println("  Max: " +
> > > totalRes.getSnapshot().getMax()/1000000);
> > >
> > > System.out.println("========================================");
> > >                     totalRes = new UniformReservoir();
> > >                 }
> > >             };
> > >         }
> > >     }
> > > }
> > >
> > > ==========busy timer=====================
> > > //idea is space the message at a fixed time.(as thread.sleep, but sleep
> > is
> > > less accurate)
> > > package Util;
> > >
> > > import java.util.ArrayList;
> > > import java.util.concurrent.ExecutorService;
> > > import java.util.concurrent.Executors;
> > > import java.util.concurrent.atomic.AtomicInteger;
> > > import java.util.concurrent.atomic.AtomicLong;
> > >
> > > public class BusyTimer {
> > >     long interval;
> > >     long duration;
> > >     ArrayList<Long> pubTime;
> > >     ExecutorService ex = Executors.newSingleThreadExecutor();
> > >     Runnable task;
> > >
> > >
> > >     public BusyTimer(long microInterval, long exDurationInSeconds,
> > > Runnable task){
> > >         pubTime = new ArrayList<Long>((int)(exDurationInSeconds * 1000
> *
> > > 1000 / microInterval+1));
> > >
> > >         this.interval = microInterval * 1000;
> > >         this.duration = exDurationInSeconds * 1000000000;
> > >         this.task = task;
> > >
> > >     }
> > >
> > >     private void busywaitUntil(long nano){
> > >         while(System.nanoTime() < nano){
> > >
> > >         }
> > >     }
> > >
> > >     public void spaceMessageWithInterval(){
> > >         int i =0 ;
> > >         long baseTime = System.nanoTime();
> > >         long doneTime = baseTime + duration;
> > >         while(true) {
> > >             task.run();
> > >             pubTime.add(System.nanoTime());
> > >             long targetTime = System.nanoTime() + interval;
> > >             if(System.nanoTime() > doneTime ){
> > >                 break;
> > >             }
> > >             busywaitUntil(targetTime);
> > >         }
> > >     }
> > > }
> > >
> > >
> > >
On Fri, Aug 24, 2018 at 3:37 PM Nan Xu <nanxu1...@gmail.com> wrote:
> > >
> > >> Looks really promising but after upgrade, still show the same result.
> I
> > >> will post the program soon. Maybe you can see where the problem could
> > be.
> > >>
> > >> Nan
> > >>
On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang <wangg...@gmail.com> wrote:
> wrote:
> > >>
> > >>> Hello Nan,
> > >>>
> > >>> Kafka does not tie up the processing thread to do disk flushing.
> > However,
> > >>> since you are on an older version of Kafka I suspect you're bumping
> > into
> > >>> some old issues that have been resolved in later versions. e.g.
> > >>>
> > >>> https://issues.apache.org/jira/browse/KAFKA-4614
> > >>>
> > >>> I'd suggest you upgrading to latest version (2.0.0) and try again to
> > see
> > >>> if
> > >>> you observe the same pattern.
> > >>>
> > >>> Guozhang
> > >>>
On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni <
sbpothin...@gmail.com> wrote:
> > >>> sbpothin...@gmail.com> wrote:
> > >>>
> > >>> > I will wait for the expert’s opinion:
> > >>> >
> > >>> > Did the Transparent Huge Pages(THP) disabled on the broker machine?
> > >>> it’s a
> > >>> > Linux kernel parameter.
> > >>> >
> > >>> > -Sudhir
> > >>> >
> On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> > >>> > >
> > >>> > > I think I found where the problem is, how to solve and why, still
> > not
> > >>> > sure.
> > >>> > >
> > >>> > > it related to disk (maybe flushing?). I did a single machine,
> > single
> > >>> > node,
> > >>> > > single topic and single partition setup.  producer pub as 2000
> > >>> message/s,
> > >>> > > 10K size message size. and single key.
> > >>> > >
> > >>> > > when I save kafka log to the  memory based partition, I don't
> see a
> > >>> > latency
> > >>> > > over 100ms. top around 70ms.
> > >>> > > when I save to a ssd hard drive. I do see latency spike, sometime
> > >>> over
> > >>> > 1s.
> > >>> > >
> > >>> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has
> > >>> impact,
> > >>> > > but only to make thing worse... need suggestion.
> > >>> > >
> > >>> > > I think log flushing is totally async and done by OS in the
> default
> > >>> > > setting. does kafka has to wait when flushing data to disk?
> > >>> > >
> > >>> > > Thanks,
> > >>> > > Nan
> > >>> > >
> > >>> > >
> > >>> > >
> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <
wangg...@gmail.com>
wrote:
> > wangg...@gmail.com>
> > >>> > wrote:
> > >>> > >>
> > >>> > >> Given your application code:
> > >>> > >>
> > >>> > >> ----------------------------
> > >>> > >>
> > >>> > >> final KStream<String, NodeMutation> localDeltaStream =
> > >>> builder.stream(
> > >>> > >>
> > >>> > >>            localDeltaTopic,
> > >>> > >>
> > >>> > >>            Consumed.with(
> > >>> > >>
> > >>> > >>                new Serdes.StringSerde(),
> > >>> > >>
> > >>> > >>                new NodeMutationSerde<>()
> > >>> > >>
> > >>> > >>            )
> > >>> > >>
> > >>> > >>        );
> > >>> > >>
> > >>> > >>  KStream<String, NodeState> localHistStream =
> > >>> > localDeltaStream.mapValues(
> > >>> > >>
> > >>> > >>            (mutation) -> NodeState
> > >>> > >>
> > >>> > >>                .newBuilder()
> > >>> > >>
> > >>> > >>                .setMeta(
> > >>> > >>
> > >>> > >>                    mutation.getMetaMutation().getMeta()
> > >>> > >>
> > >>> > >>                )
> > >>> > >>
> > >>> > >>                .setValue(
> > >>> > >>
> > >>> > >>                    mutation.getValueMutation().getValue()
> > >>> > >>
> > >>> > >>                )
> > >>> > >>
> > >>> > >>                .build()
> > >>> > >>
> > >>> > >>        );
> > >>> > >>
> > >>> > >>  localHistStream.to(
> > >>> > >>
> > >>> > >>            localHistTopic,
> > >>> > >>
> > >>> > >>            Produced.with(new Serdes.StringSerde(), new
> > >>> > NodeStateSerde<>())
> > >>> > >>
> > >>> > >>        );
> > >>> > >>
> > >>> > >> ----------------------------
> > >>> > >>
> > >>> > >> which is pure stateless, committing will not touch on an state
> > >>> > directory at
> > >>> > >> all. Hence committing only involves committing offsets to Kafka.
> > >>> > >>
> > >>> > >>
> > >>> > >> Guozhang
> > >>> > >>
> > >>> > >>
>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com>
wrote:
> > >>> wrote:
> > >>> > >>>
> > >>> > >>> I was suspecting that too, but I also noticed the spike is not
> > >>> spaced
> > >>> > >>> around 10s. to further prove it. I put kafka data directory in
> a
> > >>> memory
> > >>> > >>> based directory.  it still has such latency spikes.  I am going
> > to
> > >>> test
> > >>> > >> it
> > >>> > >>> on a single broker, single partition env.  will report back
> soon.
> > >>> > >>>
On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <
wangg...@gmail.com>
wrote:
> > wangg...@gmail.com>
> > >>> > >> wrote:
> > >>> > >>>
> > >>> > >>>> Hello Nan,
> > >>> > >>>>
> > >>> > >>>> Thanks for the detailed information you shared. When Kafka
> > >>> Streams is
> > >>> > >>>> normally running, no rebalances should be triggered unless
> some
> > >>> of the
> > >>> > >>>> instances (in your case, docker containers) have soft
> failures.
> > >>> > >>>>
> > >>> > >>>> I suspect the latency spike is due to the commit intervals:
> > >>> streams
> > >>> > >> will
> > >>> > >>>> try to commit its offset at a regular paces, which may
> increase
> > >>> > >> latency.
> > >>> > >>> It
> > >>> > >>>> is controlled by the "commit.interval.ms" config value. I saw
> > >>> that in
> > >>> > >>> your
> > >>> > >>>> original email you've set it to 10 * 1000 (10 seconds). Is
> that
> > >>> > aligned
> > >>> > >>>> with the frequency you observe latency spikes?
> > >>> > >>>>
> > >>> > >>>>
> > >>> > >>>> Guozhang
> > >>> > >>>>
> > >>> > >>>>
>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <
nanxu1...@gmail.com>
wrote:
> nanxu1...@gmail.com>
> > >>> > wrote:
> > >>> > >>>>>
> > >>> > >>>>> did more test and and make the test case simple.
> > >>> > >>>>> all the setup now is a single physical machine. running 3
> > docker
> > >>> > >>>> instance.
> > >>> > >>>>> a1, a2, a3
> > >>> > >>>>>
> > >>> > >>>>> kafka + zookeeper running on all of those docker containers.
> > >>> > >>>>> producer running on a1, send a single key,  update speed 2000
> > >>> > >>> message/s,
> > >>> > >>>>> each message is 10K size.
> > >>> > >>>>> 3 consumer(different group)  are running. one on each docker.
> > >>> > >>>>> all topics are pre-created.
> > >>> > >>>>> in startup, I do see some latency greater than 100ms, which
> is
> > >>> fine.
> > >>> > >>> and
> > >>> > >>>>> then everything is good. latency is low and consumer don't
> see
> > >>> > >> anything
> > >>> > >>>>> over 100ms for a while.
> > >>> > >>>>> then I see a few messages have latency over 100ms. then back
> to
> > >>> > >> normal,
> > >>> > >>>>> then happen again..... do seems like gc problem. but I check
> > the
> > >>> gc
> > >>> > >>>> log.  I
> > >>> > >>>>> don't think it can cause over 100ms. (both are G1 collector)
> > >>> > >>>>>
> > >>> > >>>>> after the stream stable running( exclude the startup), the
> > first
> > >>> > >>> message
> > >>> > >>>>> over 100ms take 179ms  and the gc ( it has a 30ms pause, but
> > >>> should
> > >>> > >> not
> > >>> > >>>>> cause a 179ms end to end).
> > >>> > >>>>>
> > >>> > >>>>> FROM APP
> > >>> > >>>>>
> > >>> > >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation
> Failure)
> > >>> > >>>>> 3184739K->84018K(5947904K), 0.0093730 secs]
> > >>> > >>>>>
> > >>> > >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation
> Failure)
> > >>> > >>>>> 3184690K->84280K(6053888K), 0.0087473 secs]
> > >>> > >>>>>
> > >>> > >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation
> Failure)
> > >>> > >>>>> 3301176K->84342K(6061056K), 0.0127339 secs]
> > >>> > >>>>>
> > >>> > >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation
> Failure)
> > >>> > >>>>> 3301238K->84624K(6143488K), 0.0140844 secs]
> > >>> > >>>>>
> > >>> > >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation
> Failure)
> > >>> > >>>>> 3386000K->89949K(6144000K), 0.0108118 secs]
> > >>> > >>>>>
> > >>> > >>>>>
> > >>> > >>>>>
> > >>> > >>>>> kafka a1
> > >>> > >>>>>
> > >>> > >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1
> > Evacuation
> > >>> > >> Pause)
> > >>> > >>>>> (young), 0.0214200 secs]
> > >>> > >>>>>
> > >>> > >>>>>   [Parallel Time: 17.2 ms, GC Workers: 8]
> > >>> > >>>>>
> > >>> > >>>>>      [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9,
> > Max:
> > >>> > >>>>> 7982673.8, Diff: 16.3]
> > >>> > >>>>>
> > >>> > >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5,
> > Diff:
> > >>> > >> 1.5,
> > >>> > >>>>> Sum: 1.5]
> > >>> > >>>>>
> > >>> > >>>>>      [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff:
> 6.5,
> > >>> Sum:
> > >>> > >>> 8.4]
> > >>> > >>>>>
> > >>> > >>>>>         [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff:
> > 13,
> > >>> > >> Sum:
> > >>> > >>>> 37]
> > >>> > >>>>>
> > >>> > >>>>>      [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0,
> > Sum:
> > >>> > >> 7.1]
> > >>> > >>>>>
> > >>> > >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0,
> > >>> Diff:
> > >>> > >>> 0.0,
> > >>> > >>>>> Sum: 0.0]
> > >>> > >>>>>
> > >>> > >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff:
> > 6.5,
> > >>> > >> Sum:
> > >>> > >>>>> 36.5]
> > >>> > >>>>>
> > >>> > >>>>>      [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff:
> > 0.9,
> > >>> > >> Sum:
> > >>> > >>>> 2.9]
> > >>> > >>>>>
> > >>> > >>>>>         [Termination Attempts: Min: 1, Avg: 10.4, Max: 25,
> > Diff:
> > >>> 24,
> > >>> > >>>> Sum:
> > >>> > >>>>> 83]
> > >>> > >>>>>
> > >>> > >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0,
> Diff:
> > >>> 0.0,
> > >>> > >>>> Sum:
> > >>> > >>>>> 0.1]
> > >>> > >>>>>
> > >>> > >>>>>      [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4,
> > Diff:
> > >>> > >> 16.2,
> > >>> > >>>>> Sum: 56.5]
> > >>> > >>>>>
> > >>> > >>>>>      [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0,
> Max:
> > >>> > >>>> 7982674.5,
> > >>> > >>>>> Diff: 0.6]
> > >>> > >>>>>
> > >>> > >>>>>   [Code Root Fixup: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>   [Code Root Purge: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>   [Clear CT: 1.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>   [Other: 3.2 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Choose CSet: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Ref Proc: 1.9 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Ref Enq: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Redirty Cards: 0.8 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Humongous Register: 0.1 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Humongous Reclaim: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Free CSet: 0.2 ms]
> > >>> > >>>>>
> > >>> > >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors:
> 3072.0K->3072.0K
> > >>> Heap:
> > >>> > >>>>> 265.5M(1024.0M)->217.9M(1024.0M)]
> > >>> > >>>>>
> > >>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
> > >>> > >>>>>
> > >>> > >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1
> > Evacuation
> > >>> > >> Pause)
> > >>> > >>>>> (young), 0.0310004 secs]
> > >>> > >>>>>
> > >>> > >>>>>   [Parallel Time: 24.4 ms, GC Workers: 8]
> > >>> > >>>>>
> > >>> > >>>>>      [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8,
> > Max:
> > >>> > >>>>> 7984444.7, Diff: 18.6]
> > >>> > >>>>>
> > >>> > >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9,
> > Diff:
> > >>> > >> 1.9,
> > >>> > >>>>> Sum: 2.0]
> > >>> > >>>>>
> > >>> > >>>>>      [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff:
> > 11.8,
> > >>> > >> Sum:
> > >>> > >>>>> 32.9]
> > >>> > >>>>>
> > >>> > >>>>>         [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff:
> > 25,
> > >>> > >> Sum:
> > >>> > >>>> 43]
> > >>> > >>>>>
> > >>> > >>>>>      [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff:
> 11.2,
> > >>> Sum:
> > >>> > >>>> 25.5]
> > >>> > >>>>>
> > >>> > >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0,
> > >>> Diff:
> > >>> > >>> 0.0,
> > >>> > >>>>> Sum: 0.0]
> > >>> > >>>>>
> > >>> > >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff:
> > 6.9,
> > >>> > >> Sum:
> > >>> > >>>>> 32.7]
> > >>> > >>>>>
> > >>> > >>>>>      [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff:
> > 1.6,
> > >>> > >> Sum:
> > >>> > >>>> 6.8]
> > >>> > >>>>>
> > >>> > >>>>>         [Termination Attempts: Min: 1, Avg: 5.4, Max: 11,
> Diff:
> > >>> 10,
> > >>> > >>> Sum:
> > >>> > >>>>> 43]
> > >>> > >>>>>
> > >>> > >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0,
> Diff:
> > >>> 0.0,
> > >>> > >>>> Sum:
> > >>> > >>>>> 0.1]
> > >>> > >>>>>
> > >>> > >>>>>      [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6,
> > Diff:
> > >>> > >>> 19.1,
> > >>> > >>>>> Sum: 100.1]
> > >>> > >>>>>
> > >>> > >>>>>      [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3,
> Max:
> > >>> > >>>> 7984449.9,
> > >>> > >>>>> Diff: 0.8]
> > >>> > >>>>>
> > >>> > >>>>>   [Code Root Fixup: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>   [Code Root Purge: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>   [Clear CT: 1.1 ms]
> > >>> > >>>>>
> > >>> > >>>>>   [Other: 5.5 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Choose CSet: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Ref Proc: 2.2 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Ref Enq: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Redirty Cards: 2.8 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Humongous Register: 0.1 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Humongous Reclaim: 0.0 ms]
> > >>> > >>>>>
> > >>> > >>>>>      [Free CSet: 0.1 ms]
> > >>> > >>>>>
> > >>> > >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors:
> 3072.0K->3072.0K
> > >>> Heap:
> > >>> > >>>>> 265.9M(1024.0M)->218.4M(1024.0M)]
> > >>> > >>>>>
> > >>> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
> > >>> > >>>>>
> > >>> > >>>>>
> > >>> > >>>>> so when kafka stream running, is there any trying to
> rebalance?
> > >>> > >> either
> > >>> > >>>>> broker rebalance or client rebalance?
> > >>> > >>>>> any kind of test to see what cause the trouble?
> > >>> > >>>>>
> > >>> > >>>>> Thanks,
> > >>> > >>>>> Nan
> > >>> > >>>>>
> > >>> > >>>>>
> > >>> > >>>>>
> > >>> > >>>>>
> > >>> > >>>>>
> > >>> > >>>>>
On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <
wangg...@gmail.com>
wrote:
> > >>> wangg...@gmail.com>
> > >>> > >>>> wrote:
> > >>> > >>>>>
> > >>> > >>>>>> Okay, so you're measuring end-to-end time from producer ->
> > >>> broker
> > >>> > >> ->
> > >>> > >>>>>> streams' consumer client, there are multiple phases that can
> > >>> > >>> contribute
> > >>> > >>>>> to
> > >>> > >>>>>> the 100ms latency, and I cannot tell if stream's consumer
> > phase
> > >>> is
> > >>> > >>> the
> > >>> > >>>>>> major contributor. For example, if the topic was not created
> > >>> > >> before,
> > >>> > >>>> then
> > >>> > >>>>>> when the broker first received a produce request it may need
> > to
> > >>> > >>> create
> > >>> > >>>>> the
> > >>> > >>>>>> topic, which involves multiple steps including writes to ZK
> > >>> which
> > >>> > >>> could
> > >>> > >>>>>> take time.
> > >>> > >>>>>>
> > >>> > >>>>>> There are some confusions from your description: you
> mentioned
> > >>> > >> "Kafka
> > >>> > >>>>>> cluster is already up and running", but I think you are
> > >>> referring
> > >>> > >> to
> > >>> > >>>>> "Kafka
> > >>> > >>>>>> Streams application instances are already up and running",
> > >>> right?
> > >>> > >>> Since
> > >>> > >>>>>> only the latter has rebalance process, while the Kafak
> brokers
> > >>> do
> > >>> > >> not
> > >>> > >>>>>> really have "rebalances" except balancing load by migrating
> > >>> > >>> partitions.
> > >>> > >>>>>>
> > >>> > >>>>>> Guozhang
> > >>> > >>>>>>
> > >>> > >>>>>>
> > >>> > >>>>>>
On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <
nanxu1...@gmail.com>
wrote:
> nanxu1...@gmail.com>
> > >>> > >> wrote:
> > >>> > >>>>>>
> > >>> > >>>>>>> right, so my kafka cluster is already up and running for a
> > >>> while,
> > >>> > >>>> and I
> > >>> > >>>>>> can
> > >>> > >>>>>>> see from the log all broker instance already change from
> > >>> > >> rebalance
> > >>> > >>> to
> > >>> > >>>>>>> running.
> > >>> > >>>>>>>
> > >>> > >>>>>>> I did a another test.
> > >>> > >>>>>>> from producer, right before the message get send to the
> > >>> broker, I
> > >>> > >>>> put a
> > >>> > >>>>>>> timestamp in the message. and from the consumer side which
> is
> > >>> > >> after
> > >>> > >>>>>> stream
> > >>> > >>>>>>> processing, I compare this timestamp with current time. I
> can
> > >>> see
> > >>> > >>>> some
> > >>> > >>>>>>> message processing time is above 100ms on some real
> powerful
> > >>> > >>>> hardware.
> > >>> > >>>>>> and
> > >>> > >>>>>>> from my application gc, all the gc time is below 1ms, kafka
> > gc
> > >>> > >> only
> > >>> > >>>>>> happen
> > >>> > >>>>>>> once and below 1ms too.
> > >>> > >>>>>>>
> > >>> > >>>>>>> very puzzled. is there any communication to zookeeper, if
> not
> > >>> get
> > >>> > >>>>>> response,
> > >>> > >>>>>>> will cause the broker to pause? I don't think that's the
> case
> > >>> but
> > >>> > >>> at
> > >>> > >>>>> this
> > >>> > >>>>>>> time don't know what else can be suspected.
> > >>> > >>>>>>>
> > >>> > >>>>>>> Nan
> > >>> > >>>>>>>
On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <
wangg...@gmail.com>
wrote:
> > >>> > >> wangg...@gmail.com>
> > >>> > >>>>>> wrote:
> > >>> > >>>>>>>
> > >>> > >>>>>>>> Hello Nan,
> > >>> > >>>>>>>>
> > >>> > >>>>>>>> Note that Streams may need some time to rebalance and
> assign
> > >>> > >>> tasks
> > >>> > >>>>> even
> > >>> > >>>>>>> if
> > >>> > >>>>>>>> you only starts with one instance.
> > >>> > >>>>>>>>
> > >>> > >>>>>>>> I'd suggest you register your state listener in Kafka
> > Streams
> > >>> > >> via
> > >>> > >>>>>>>> KafkaStreams#setStateListener, and your customized
> > >>> > >> StateListener
> > >>> > >>>>> should
> > >>> > >>>>>>>> record when the state transits from REBALANCING to RUNNING
> > >>> > >> since
> > >>> > >>>> only
> > >>> > >>>>>>> after
> > >>> > >>>>>>>> that the streams client will start to process the first
> > >>> record.
> > >>> > >>>>>>>>
> > >>> > >>>>>>>>
> > >>> > >>>>>>>> Guozhang
> > >>> > >>>>>>>>
> > >>> > >>>>>>>>
On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <
nanxu1...@gmail.com>
wrote:
> > nanxu1...@gmail.com>
> > >>> > >>>> wrote:
> > >>> > >>>>>>>>
> > >>> > >>>>>>>>> thanks, which JMX properties indicate  "processing
> latency
> > >>> > >>>>> spikes"  /
> > >>> > >>>>>>>>> "throughput"
> > >>> > >>>>>>>>>
> > >>> > >>>>>>>>>
On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
matth...@confluent.io
>
wrote:
> > >>> > >>>>>> matth...@confluent.io
> > >>> > >>>>>>>>
> > >>> > >>>>>>>>> wrote:
> > >>> > >>>>>>>>>
> > >>> > >>>>>>>>>> I cannot spot any obvious reasons.
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>>> As you consume from the result topic for verification,
> we
> > >>> > >>>> should
> > >>> > >>>>>>> verify
> > >>> > >>>>>>>>>> that the latency spikes original on write and not on
> read:
> > >>> > >>> you
> > >>> > >>>>>> might
> > >>> > >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see
> > if
> > >>> > >>>>>>> processing
> > >>> > >>>>>>>>>> latency spikes or throughput drops.
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>>> Also watch for GC pauses in the JVM.
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>>> Hope this helps.
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>>> -Matthias
> > >>> > >>>>>>>>>>
>> On 8/17/18 12:13 PM, Nan Xu wrote:
> > >>> > >>>>>>>>>>> btw, I am using version
> > >>> > >>>>>>>>>>>
On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <
nanxu1...@gmail.com>
wrote:
> > >>> > >>> nanxu1...@gmail.com>
> > >>> > >>>>>>> wrote:
> > >>> > >>>>>>>>>>>
> > >>> > >>>>>>>>>>>> I am working on a kafka stream app, and see huge
> latency
> > >>> > >>>>>> variance,
> > >>> > >>>>>>>>>>>> wondering what can cause this?
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>> the processing is very simple and don't have state,
> > >>> > >>>> linger.ms
> > >>> > >>>>>>>> already
> > >>> > >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and
> > >>> > >>>>> published
> > >>> > >>>>>>> as
> > >>> > >>>>>>>>> 2000
> > >>> > >>>>>>>>>>>> messages/s, network is 10G.  using a regular consumer
> > >>> > >>> watch
> > >>> > >>>>> the
> > >>> > >>>>>>>>>>>> localHistTopic  topic and just every 2000 message
> print
> > >>> > >>> out
> > >>> > >>>> a
> > >>> > >>>>>>>> counter,
> > >>> > >>>>>>>>>> it
> > >>> > >>>>>>>>>>>> usually every second I get a count 2000 as the publish
> > >>> > >>>> speed,
> > >>> > >>>>>> but
> > >>> > >>>>>>>>>> sometime
> > >>> > >>>>>>>>>>>> I see it stall for 3 or more seconds and then print
> out
> > >>> > >> a
> > >>> > >>>> few
> > >>> > >>>>>>> count.
> > >>> > >>>>>>>>>> like
> > >>> > >>>>>>>>>>>> cpu is paused during that time or message being
> > >>> > >>> cache/batch
> > >>> > >>>>> then
> > >>> > >>>>>>>>>> processed.
> > >>> > >>>>>>>>>>>> any suggestion?
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>  final Properties streamsConfiguration = new
> > >>> > >>> Properties();
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_
> > >>> > >>>>>>>>>>>> applicationId);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_
> > >>> > >>>>> CONFIG,
> > >>> > >>>>>>>>>> clientId);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> > >>> > >>>>>>>>> SERVERS_CONFIG,
> > >>> > >>>>>>>>>>>> bootstrapServers);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> > >>> > >>>>>>>>> SERDE_CLASS_CONFIG,
> > >>> > >>>>>>>>>>>> Serdes.String()
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            .getClass().getName());
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> > >>> > >>>>> MS_CONFIG,
> > >>> > >>>>>>>>>>>> 10 * 1000);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>> //
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>
> > >>> > >>>>>>
> > >>> > >>>>
> > >>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> > >>> > >>>>>>>>> 0);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
> > >>> > >>>>> 335544320);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
> > >>> > >>>>>>> REQUESTS_PER_CONNECTION,
> > >>> > >>>>>>>>> 30);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.consumerPrefix(
> > >>> > >>>>>>>>>>>>            ConsumerConfig.MAX_PARTITION_
> > >>> > >>>>> FETCH_BYTES_CONFIG),20
> > >>> > >>>>>> *
> > >>> > >>>>>>>>> 1024 *
> > >>> > >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>> //
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>
> > >>> > >>>>>>
> > >>> > >>>>
> > >>> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> > >>> > >>>>>>>>> 0);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
> > >>> > >>>>> 335544320);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
> > >>> > >>>>>>> REQUESTS_PER_CONNECTION,
> > >>> > >>>>>>>>> 30);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>> > >>>>> StreamsConfig.consumerPrefix(
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            ConsumerConfig. MAX_PARTITION_FETCH_BYTES_
> > >>> > >>> CONFIG
> > >>> > >>>>> ,
> > >>> > >>>>>>> 20 *
> > >>> > >>>>>>>>>> 1024 *
> > >>> > >>>>>>>>>>>> 1024);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder();
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream =
> > >>> > >>>>>>>>> builder.stream(
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            localDeltaTopic,
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            Consumed.with(
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                new Serdes.StringSerde(),
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                new NodeMutationSerde<>()
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            )
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        );
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>  KStream<String, NodeState> localHistStream =
> > >>> > >>>>>>>>>> localDeltaStream.mapValues(
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            (mutation) -> NodeState
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                .newBuilder()
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                .setMeta(
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> mutation.getMetaMutation().getMeta()
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                )
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                .setValue(
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                    mutation.getValueMutation().
> > >>> > >>> getValue()
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                )
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>                .build()
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        );
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>  localHistStream.to(
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            localHistTopic,
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>            Produced.with(new Serdes.StringSerde(), new
> > >>> > >>>>>>>>>> NodeStateSerde<>())
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        );
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>> streams = new KafkaStreams(builder.build(),
> > >>> > >>>>>>> streamsConfiguration);
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>        streams.cleanUp();
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>> streams.start();
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>>
> > >>> > >>>>>>>>>>>
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>>>
> > >>> > >>>>>>>>>
> > >>> > >>>>>>>>
> > >>> > >>>>>>>>
> > >>> > >>>>>>>>
> > >>> > >>>>>>>> --
> > >>> > >>>>>>>> -- Guozhang
> > >>> > >>>>>>>>
> > >>> > >>>>>>>
> > >>> > >>>>>>
> > >>> > >>>>>>
> > >>> > >>>>>>
> > >>> > >>>>>> --
> > >>> > >>>>>> -- Guozhang
> > >>> > >>>>>>
> > >>> > >>>>>
> > >>> > >>>>
> > >>> > >>>>
> > >>> > >>>>
> > >>> > >>>> --
> > >>> > >>>> -- Guozhang
> > >>> > >>>>
> > >>> > >>>
> > >>> > >>
> > >>> > >>
> > >>> > >>
> > >>> > >> --
> > >>> > >> -- Guozhang
> > >>> > >>
> > >>> >
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> >
> --
> -- Guozhang

