I will wait for the expert’s opinion: Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s a Linux kernel parameter.
-Sudhir > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote: > > I think I found where the problem is, how to solve and why, still not sure. > > it related to disk (maybe flushing?). I did a single machine, single node, > single topic and single partition setup. producer pub as 2000 message/s, > 10K size message size. and single key. > > when I save kafka log to the memory based partition, I don't see a latency > over 100ms. top around 70ms. > when I save to a ssd hard drive. I do see latency spike, sometime over 1s. > > adjust the log.flush.inteval.message / log.flush.intefval.ms has impact, > but only to make thing worse... need suggestion. > > I think log flushing is totally async and done by OS in the default > setting. does kafka has to wait when flushing data to disk? > > Thanks, > Nan > > > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >> Given your application code: >> >> ---------------------------- >> >> final KStream<String, NodeMutation> localDeltaStream = builder.stream( >> >> localDeltaTopic, >> >> Consumed.with( >> >> new Serdes.StringSerde(), >> >> new NodeMutationSerde<>() >> >> ) >> >> ); >> >> KStream<String, NodeState> localHistStream = localDeltaStream.mapValues( >> >> (mutation) -> NodeState >> >> .newBuilder() >> >> .setMeta( >> >> mutation.getMetaMutation().getMeta() >> >> ) >> >> .setValue( >> >> mutation.getValueMutation().getValue() >> >> ) >> >> .build() >> >> ); >> >> localHistStream.to( >> >> localHistTopic, >> >> Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>()) >> >> ); >> >> ---------------------------- >> >> which is pure stateless, committing will not touch on an state directory at >> all. Hence committing only involves committing offsets to Kafka. >> >> >> Guozhang >> >> >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote: >>> >>> I was suspecting that too, but I also noticed the spike is not spaced >>> around 10s. to further prove it. I put kafka data directory in a memory >>> based directory. it still has such latency spikes. I am going to test >> it >>> on a single broker, single partition env. will report back soon. >>> >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >>> >>>> Hello Nan, >>>> >>>> Thanks for the detailed information you shared. When Kafka Streams is >>>> normally running, no rebalances should be triggered unless some of the >>>> instances (in your case, docker containers) have soft failures. >>>> >>>> I suspect the latency spike is due to the commit intervals: streams >> will >>>> try to commit its offset at a regular paces, which may increase >> latency. >>> It >>>> is controlled by the "commit.interval.ms" config value. I saw that in >>> your >>>> original email you've set it to 10 * 1000 (10 seconds). Is that aligned >>>> with the frequency you observe latency spikes? >>>> >>>> >>>> Guozhang >>>> >>>> >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> wrote: >>>>> >>>>> did more test and and make the test case simple. >>>>> all the setup now is a single physical machine. running 3 docker >>>> instance. >>>>> a1, a2, a3 >>>>> >>>>> kafka + zookeeper running on all of those docker containers. >>>>> producer running on a1, send a single key, update speed 2000 >>> message/s, >>>>> each message is 10K size. >>>>> 3 consumer(different group) are running. one on each docker. >>>>> all topics are pre-created. >>>>> in startup, I do see some latency greater than 100ms, which is fine. >>> and >>>>> then everything is good. latency is low and consumer don't see >> anything >>>>> over 100ms for a while. >>>>> then I see a few messages have latency over 100ms. then back to >> normal, >>>>> then happen again..... do seems like gc problem. but I check the gc >>>> log. I >>>>> don't think it can cause over 100ms. (both are G1 collector) >>>>> >>>>> after the stream stable running( exclude the startup), the first >>> message >>>>> over 100ms take 179ms and the gc ( it has a 30ms pause, but should >> not >>>>> cause a 179ms end to end). >>>>> >>>>> FROM APP >>>>> >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure) >>>>> 3184739K->84018K(5947904K), 0.0093730 secs] >>>>> >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure) >>>>> 3184690K->84280K(6053888K), 0.0087473 secs] >>>>> >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure) >>>>> 3301176K->84342K(6061056K), 0.0127339 secs] >>>>> >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure) >>>>> 3301238K->84624K(6143488K), 0.0140844 secs] >>>>> >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure) >>>>> 3386000K->89949K(6144000K), 0.0108118 secs] >>>>> >>>>> >>>>> >>>>> kafka a1 >>>>> >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation >> Pause) >>>>> (young), 0.0214200 secs] >>>>> >>>>> [Parallel Time: 17.2 ms, GC Workers: 8] >>>>> >>>>> [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max: >>>>> 7982673.8, Diff: 16.3] >>>>> >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff: >> 1.5, >>>>> Sum: 1.5] >>>>> >>>>> [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum: >>> 8.4] >>>>> >>>>> [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13, >> Sum: >>>> 37] >>>>> >>>>> [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum: >> 7.1] >>>>> >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: >>> 0.0, >>>>> Sum: 0.0] >>>>> >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5, >> Sum: >>>>> 36.5] >>>>> >>>>> [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9, >> Sum: >>>> 2.9] >>>>> >>>>> [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24, >>>> Sum: >>>>> 83] >>>>> >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, >>>> Sum: >>>>> 0.1] >>>>> >>>>> [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff: >> 16.2, >>>>> Sum: 56.5] >>>>> >>>>> [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max: >>>> 7982674.5, >>>>> Diff: 0.6] >>>>> >>>>> [Code Root Fixup: 0.0 ms] >>>>> >>>>> [Code Root Purge: 0.0 ms] >>>>> >>>>> [Clear CT: 1.0 ms] >>>>> >>>>> [Other: 3.2 ms] >>>>> >>>>> [Choose CSet: 0.0 ms] >>>>> >>>>> [Ref Proc: 1.9 ms] >>>>> >>>>> [Ref Enq: 0.0 ms] >>>>> >>>>> [Redirty Cards: 0.8 ms] >>>>> >>>>> [Humongous Register: 0.1 ms] >>>>> >>>>> [Humongous Reclaim: 0.0 ms] >>>>> >>>>> [Free CSet: 0.2 ms] >>>>> >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap: >>>>> 265.5M(1024.0M)->217.9M(1024.0M)] >>>>> >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] >>>>> >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation >> Pause) >>>>> (young), 0.0310004 secs] >>>>> >>>>> [Parallel Time: 24.4 ms, GC Workers: 8] >>>>> >>>>> [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max: >>>>> 7984444.7, Diff: 18.6] >>>>> >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff: >> 1.9, >>>>> Sum: 2.0] >>>>> >>>>> [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8, >> Sum: >>>>> 32.9] >>>>> >>>>> [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25, >> Sum: >>>> 43] >>>>> >>>>> [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum: >>>> 25.5] >>>>> >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: >>> 0.0, >>>>> Sum: 0.0] >>>>> >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9, >> Sum: >>>>> 32.7] >>>>> >>>>> [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6, >> Sum: >>>> 6.8] >>>>> >>>>> [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10, >>> Sum: >>>>> 43] >>>>> >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, >>>> Sum: >>>>> 0.1] >>>>> >>>>> [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff: >>> 19.1, >>>>> Sum: 100.1] >>>>> >>>>> [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max: >>>> 7984449.9, >>>>> Diff: 0.8] >>>>> >>>>> [Code Root Fixup: 0.0 ms] >>>>> >>>>> [Code Root Purge: 0.0 ms] >>>>> >>>>> [Clear CT: 1.1 ms] >>>>> >>>>> [Other: 5.5 ms] >>>>> >>>>> [Choose CSet: 0.0 ms] >>>>> >>>>> [Ref Proc: 2.2 ms] >>>>> >>>>> [Ref Enq: 0.0 ms] >>>>> >>>>> [Redirty Cards: 2.8 ms] >>>>> >>>>> [Humongous Register: 0.1 ms] >>>>> >>>>> [Humongous Reclaim: 0.0 ms] >>>>> >>>>> [Free CSet: 0.1 ms] >>>>> >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap: >>>>> 265.9M(1024.0M)->218.4M(1024.0M)] >>>>> >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] >>>>> >>>>> >>>>> so when kafka stream running, is there any trying to rebalance? >> either >>>>> broker rebalance or client rebalance? >>>>> any kind of test to see what cause the trouble? >>>>> >>>>> Thanks, >>>>> Nan >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>>> >>>>>> Okay, so you're measuring end-to-end time from producer -> broker >> -> >>>>>> streams' consumer client, there are multiple phases that can >>> contribute >>>>> to >>>>>> the 100ms latency, and I cannot tell if stream's consumer phase is >>> the >>>>>> major contributor. For example, if the topic was not created >> before, >>>> then >>>>>> when the broker first received a produce request it may need to >>> create >>>>> the >>>>>> topic, which involves multiple steps including writes to ZK which >>> could >>>>>> take time. >>>>>> >>>>>> There are some confusions from your description: you mentioned >> "Kafka >>>>>> cluster is already up and running", but I think you are referring >> to >>>>> "Kafka >>>>>> Streams application instances are already up and running", right? >>> Since >>>>>> only the latter has rebalance process, while the Kafak brokers do >> not >>>>>> really have "rebalances" except balancing load by migrating >>> partitions. >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com> >> wrote: >>>>>> >>>>>>> right, so my kafka cluster is already up and running for a while, >>>> and I >>>>>> can >>>>>>> see from the log all broker instance already change from >> rebalance >>> to >>>>>>> running. >>>>>>> >>>>>>> I did a another test. >>>>>>> from producer, right before the message get send to the broker, I >>>> put a >>>>>>> timestamp in the message. and from the consumer side which is >> after >>>>>> stream >>>>>>> processing, I compare this timestamp with current time. I can see >>>> some >>>>>>> message processing time is above 100ms on some real powerful >>>> hardware. >>>>>> and >>>>>>> from my application gc, all the gc time is below 1ms, kafka gc >> only >>>>>> happen >>>>>>> once and below 1ms too. >>>>>>> >>>>>>> very puzzled. is there any communication to zookeeper, if not get >>>>>> response, >>>>>>> will cause the broker to pause? I don't think that's the case but >>> at >>>>> this >>>>>>> time don't know what else can be suspected. >>>>>>> >>>>>>> Nan >>>>>>> >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang < >> wangg...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>>> Hello Nan, >>>>>>>> >>>>>>>> Note that Streams may need some time to rebalance and assign >>> tasks >>>>> even >>>>>>> if >>>>>>>> you only starts with one instance. >>>>>>>> >>>>>>>> I'd suggest you register your state listener in Kafka Streams >> via >>>>>>>> KafkaStreams#setStateListener, and your customized >> StateListener >>>>> should >>>>>>>> record when the state transits from REBALANCING to RUNNING >> since >>>> only >>>>>>> after >>>>>>>> that the streams client will start to process the first record. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com> >>>> wrote: >>>>>>>> >>>>>>>>> thanks, which JMX properties indicate "processing latency >>>>> spikes" / >>>>>>>>> "throughput" >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax < >>>>>> matth...@confluent.io >>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I cannot spot any obvious reasons. >>>>>>>>>> >>>>>>>>>> As you consume from the result topic for verification, we >>>> should >>>>>>> verify >>>>>>>>>> that the latency spikes original on write and not on read: >>> you >>>>>> might >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if >>>>>>> processing >>>>>>>>>> latency spikes or throughput drops. >>>>>>>>>> >>>>>>>>>> Also watch for GC pauses in the JVM. >>>>>>>>>> >>>>>>>>>> Hope this helps. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote: >>>>>>>>>>> btw, I am using version 0.10.2.0 >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu < >>> nanxu1...@gmail.com> >>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I am working on a kafka stream app, and see huge latency >>>>>> variance, >>>>>>>>>>>> wondering what can cause this? >>>>>>>>>>>> >>>>>>>>>>>> the processing is very simple and don't have state, >>>> linger.ms >>>>>>>> already >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and >>>>> published >>>>>>> as >>>>>>>>> 2000 >>>>>>>>>>>> messages/s, network is 10G. using a regular consumer >>> watch >>>>> the >>>>>>>>>>>> localHistTopic topic and just every 2000 message print >>> out >>>> a >>>>>>>> counter, >>>>>>>>>> it >>>>>>>>>>>> usually every second I get a count 2000 as the publish >>>> speed, >>>>>> but >>>>>>>>>> sometime >>>>>>>>>>>> I see it stall for 3 or more seconds and then print out >> a >>>> few >>>>>>> count. >>>>>>>>>> like >>>>>>>>>>>> cpu is paused during that time or message being >>> cache/batch >>>>> then >>>>>>>>>> processed. >>>>>>>>>>>> any suggestion? >>>>>>>>>>>> >>>>>>>>>>>> final Properties streamsConfiguration = new >>> Properties(); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >>>>>>>>>>>> applicationId); >>>>>>>>>>>> >>>>>>>>>>>> >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_ >>>>> CONFIG, >>>>>>>>>> clientId); >>>>>>>>>>>> >>>>>>>>>>>> >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_ >>>>>>>>> SERVERS_CONFIG, >>>>>>>>>>>> bootstrapServers); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_ >>>>>>>>> SERDE_CLASS_CONFIG, >>>>>>>>>>>> Serdes.String() >>>>>>>>>>>> >>>>>>>>>>>> .getClass().getName()); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ >>>>> MS_CONFIG, >>>>>>>>>>>> 10 * 1000); >>>>>>>>>>>> >>>>>>>>>>>> // >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, >>>>>>>>> 0); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.PRODUCER_PREFIX >>>>>>>>>>>> >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.PRODUCER_PREFIX >>>>>>>>>>>> >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, >>>>> 335544320); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.PRODUCER_PREFIX >>>>>>>>>>>> >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ >>>>>>> REQUESTS_PER_CONNECTION, >>>>>>>>> 30); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.PRODUCER_PREFIX >>>>>>>>>>>> >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.consumerPrefix( >>>>>>>>>>>> ConsumerConfig.MAX_PARTITION_ >>>>> FETCH_BYTES_CONFIG),20 >>>>>> * >>>>>>>>> 1024 * >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000); >>>>>>>>>>>> >>>>>>>>>>>> // >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, >>>>>>>>> 0); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.PRODUCER_PREFIX >>>>>>>>>>>> >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.PRODUCER_PREFIX >>>>>>>>>>>> >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, >>>>> 335544320); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.PRODUCER_PREFIX >>>>>>>>>>>> >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ >>>>>>> REQUESTS_PER_CONNECTION, >>>>>>>>> 30); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.PRODUCER_PREFIX >>>>>>>>>>>> >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); >>>>>>>>>>>> >>>>>>>>>>>> streamsConfiguration.put( >>>>> StreamsConfig.consumerPrefix( >>>>>>>>>>>> >>>>>>>>>>>> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_ >>> CONFIG >>>>> , >>>>>>> 20 * >>>>>>>>>> 1024 * >>>>>>>>>>>> 1024); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder(); >>>>>>>>>>>> >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream = >>>>>>>>> builder.stream( >>>>>>>>>>>> >>>>>>>>>>>> localDeltaTopic, >>>>>>>>>>>> >>>>>>>>>>>> Consumed.with( >>>>>>>>>>>> >>>>>>>>>>>> new Serdes.StringSerde(), >>>>>>>>>>>> >>>>>>>>>>>> new NodeMutationSerde<>() >>>>>>>>>>>> >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> ); >>>>>>>>>>>> >>>>>>>>>>>> KStream<String, NodeState> localHistStream = >>>>>>>>>> localDeltaStream.mapValues( >>>>>>>>>>>> >>>>>>>>>>>> (mutation) -> NodeState >>>>>>>>>>>> >>>>>>>>>>>> .newBuilder() >>>>>>>>>>>> >>>>>>>>>>>> .setMeta( >>>>>>>>>>>> >>>>>>>>>>>> mutation.getMetaMutation().getMeta() >>>>>>>>>>>> >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> .setValue( >>>>>>>>>>>> >>>>>>>>>>>> mutation.getValueMutation(). >>> getValue() >>>>>>>>>>>> >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> .build() >>>>>>>>>>>> >>>>>>>>>>>> ); >>>>>>>>>>>> >>>>>>>>>>>> localHistStream.to( >>>>>>>>>>>> >>>>>>>>>>>> localHistTopic, >>>>>>>>>>>> >>>>>>>>>>>> Produced.with(new Serdes.StringSerde(), new >>>>>>>>>> NodeStateSerde<>()) >>>>>>>>>>>> >>>>>>>>>>>> ); >>>>>>>>>>>> >>>>>>>>>>>> streams = new KafkaStreams(builder.build(), >>>>>>> streamsConfiguration); >>>>>>>>>>>> >>>>>>>>>>>> streams.cleanUp(); >>>>>>>>>>>> >>>>>>>>>>>> streams.start(); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >> >> -- >> -- Guozhang >>