Looks really promising but after upgrade, still show the same result. I will post the program soon. Maybe you can see where the problem could be.
Nan On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Nan, > > Kafka does not tie up the processing thread to do disk flushing. However, > since you are on an older version of Kafka I suspect you're bumping into > some old issues that have been resolved in later versions. e.g. > > https://issues.apache.org/jira/browse/KAFKA-4614 > > I'd suggest you upgrading to latest version (2.0.0) and try again to see if > you observe the same pattern. > > Guozhang > > On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni < > sbpothin...@gmail.com> wrote: > > > I will wait for the expert’s opinion: > > > > Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s > a > > Linux kernel parameter. > > > > -Sudhir > > > > > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote: > > > > > > I think I found where the problem is, how to solve and why, still not > > sure. > > > > > > it related to disk (maybe flushing?). I did a single machine, single > > node, > > > single topic and single partition setup. producer pub as 2000 > message/s, > > > 10K size message size. and single key. > > > > > > when I save kafka log to the memory based partition, I don't see a > > latency > > > over 100ms. top around 70ms. > > > when I save to a ssd hard drive. I do see latency spike, sometime over > > 1s. > > > > > > adjust the log.flush.inteval.message / log.flush.intefval.ms has > impact, > > > but only to make thing worse... need suggestion. > > > > > > I think log flushing is totally async and done by OS in the default > > > setting. does kafka has to wait when flushing data to disk? > > > > > > Thanks, > > > Nan > > > > > > > > > > > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > >> > > >> Given your application code: > > >> > > >> ---------------------------- > > >> > > >> final KStream<String, NodeMutation> localDeltaStream = builder.stream( > > >> > > >> localDeltaTopic, > > >> > > >> Consumed.with( > > >> > > >> new Serdes.StringSerde(), > > >> > > >> new NodeMutationSerde<>() > > >> > > >> ) > > >> > > >> ); > > >> > > >> KStream<String, NodeState> localHistStream = > > localDeltaStream.mapValues( > > >> > > >> (mutation) -> NodeState > > >> > > >> .newBuilder() > > >> > > >> .setMeta( > > >> > > >> mutation.getMetaMutation().getMeta() > > >> > > >> ) > > >> > > >> .setValue( > > >> > > >> mutation.getValueMutation().getValue() > > >> > > >> ) > > >> > > >> .build() > > >> > > >> ); > > >> > > >> localHistStream.to( > > >> > > >> localHistTopic, > > >> > > >> Produced.with(new Serdes.StringSerde(), new > > NodeStateSerde<>()) > > >> > > >> ); > > >> > > >> ---------------------------- > > >> > > >> which is pure stateless, committing will not touch on an state > > directory at > > >> all. Hence committing only involves committing offsets to Kafka. > > >> > > >> > > >> Guozhang > > >> > > >> > > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote: > > >>> > > >>> I was suspecting that too, but I also noticed the spike is not spaced > > >>> around 10s. to further prove it. I put kafka data directory in a > memory > > >>> based directory. it still has such latency spikes. I am going to > test > > >> it > > >>> on a single broker, single partition env. will report back soon. > > >>> > > >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >>> > > >>>> Hello Nan, > > >>>> > > >>>> Thanks for the detailed information you shared. When Kafka Streams > is > > >>>> normally running, no rebalances should be triggered unless some of > the > > >>>> instances (in your case, docker containers) have soft failures. > > >>>> > > >>>> I suspect the latency spike is due to the commit intervals: streams > > >> will > > >>>> try to commit its offset at a regular paces, which may increase > > >> latency. > > >>> It > > >>>> is controlled by the "commit.interval.ms" config value. I saw that > in > > >>> your > > >>>> original email you've set it to 10 * 1000 (10 seconds). Is that > > aligned > > >>>> with the frequency you observe latency spikes? > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> > > wrote: > > >>>>> > > >>>>> did more test and and make the test case simple. > > >>>>> all the setup now is a single physical machine. running 3 docker > > >>>> instance. > > >>>>> a1, a2, a3 > > >>>>> > > >>>>> kafka + zookeeper running on all of those docker containers. > > >>>>> producer running on a1, send a single key, update speed 2000 > > >>> message/s, > > >>>>> each message is 10K size. > > >>>>> 3 consumer(different group) are running. one on each docker. > > >>>>> all topics are pre-created. > > >>>>> in startup, I do see some latency greater than 100ms, which is > fine. > > >>> and > > >>>>> then everything is good. latency is low and consumer don't see > > >> anything > > >>>>> over 100ms for a while. > > >>>>> then I see a few messages have latency over 100ms. then back to > > >> normal, > > >>>>> then happen again..... do seems like gc problem. but I check the gc > > >>>> log. I > > >>>>> don't think it can cause over 100ms. (both are G1 collector) > > >>>>> > > >>>>> after the stream stable running( exclude the startup), the first > > >>> message > > >>>>> over 100ms take 179ms and the gc ( it has a 30ms pause, but should > > >> not > > >>>>> cause a 179ms end to end). > > >>>>> > > >>>>> FROM APP > > >>>>> > > >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure) > > >>>>> 3184739K->84018K(5947904K), 0.0093730 secs] > > >>>>> > > >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure) > > >>>>> 3184690K->84280K(6053888K), 0.0087473 secs] > > >>>>> > > >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure) > > >>>>> 3301176K->84342K(6061056K), 0.0127339 secs] > > >>>>> > > >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure) > > >>>>> 3301238K->84624K(6143488K), 0.0140844 secs] > > >>>>> > > >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure) > > >>>>> 3386000K->89949K(6144000K), 0.0108118 secs] > > >>>>> > > >>>>> > > >>>>> > > >>>>> kafka a1 > > >>>>> > > >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation > > >> Pause) > > >>>>> (young), 0.0214200 secs] > > >>>>> > > >>>>> [Parallel Time: 17.2 ms, GC Workers: 8] > > >>>>> > > >>>>> [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max: > > >>>>> 7982673.8, Diff: 16.3] > > >>>>> > > >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff: > > >> 1.5, > > >>>>> Sum: 1.5] > > >>>>> > > >>>>> [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum: > > >>> 8.4] > > >>>>> > > >>>>> [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13, > > >> Sum: > > >>>> 37] > > >>>>> > > >>>>> [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum: > > >> 7.1] > > >>>>> > > >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: > > >>> 0.0, > > >>>>> Sum: 0.0] > > >>>>> > > >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5, > > >> Sum: > > >>>>> 36.5] > > >>>>> > > >>>>> [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9, > > >> Sum: > > >>>> 2.9] > > >>>>> > > >>>>> [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: > 24, > > >>>> Sum: > > >>>>> 83] > > >>>>> > > >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: > 0.0, > > >>>> Sum: > > >>>>> 0.1] > > >>>>> > > >>>>> [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff: > > >> 16.2, > > >>>>> Sum: 56.5] > > >>>>> > > >>>>> [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max: > > >>>> 7982674.5, > > >>>>> Diff: 0.6] > > >>>>> > > >>>>> [Code Root Fixup: 0.0 ms] > > >>>>> > > >>>>> [Code Root Purge: 0.0 ms] > > >>>>> > > >>>>> [Clear CT: 1.0 ms] > > >>>>> > > >>>>> [Other: 3.2 ms] > > >>>>> > > >>>>> [Choose CSet: 0.0 ms] > > >>>>> > > >>>>> [Ref Proc: 1.9 ms] > > >>>>> > > >>>>> [Ref Enq: 0.0 ms] > > >>>>> > > >>>>> [Redirty Cards: 0.8 ms] > > >>>>> > > >>>>> [Humongous Register: 0.1 ms] > > >>>>> > > >>>>> [Humongous Reclaim: 0.0 ms] > > >>>>> > > >>>>> [Free CSet: 0.2 ms] > > >>>>> > > >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K > Heap: > > >>>>> 265.5M(1024.0M)->217.9M(1024.0M)] > > >>>>> > > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] > > >>>>> > > >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation > > >> Pause) > > >>>>> (young), 0.0310004 secs] > > >>>>> > > >>>>> [Parallel Time: 24.4 ms, GC Workers: 8] > > >>>>> > > >>>>> [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max: > > >>>>> 7984444.7, Diff: 18.6] > > >>>>> > > >>>>> [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff: > > >> 1.9, > > >>>>> Sum: 2.0] > > >>>>> > > >>>>> [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8, > > >> Sum: > > >>>>> 32.9] > > >>>>> > > >>>>> [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25, > > >> Sum: > > >>>> 43] > > >>>>> > > >>>>> [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum: > > >>>> 25.5] > > >>>>> > > >>>>> [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: > > >>> 0.0, > > >>>>> Sum: 0.0] > > >>>>> > > >>>>> [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9, > > >> Sum: > > >>>>> 32.7] > > >>>>> > > >>>>> [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6, > > >> Sum: > > >>>> 6.8] > > >>>>> > > >>>>> [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10, > > >>> Sum: > > >>>>> 43] > > >>>>> > > >>>>> [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: > 0.0, > > >>>> Sum: > > >>>>> 0.1] > > >>>>> > > >>>>> [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff: > > >>> 19.1, > > >>>>> Sum: 100.1] > > >>>>> > > >>>>> [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max: > > >>>> 7984449.9, > > >>>>> Diff: 0.8] > > >>>>> > > >>>>> [Code Root Fixup: 0.0 ms] > > >>>>> > > >>>>> [Code Root Purge: 0.0 ms] > > >>>>> > > >>>>> [Clear CT: 1.1 ms] > > >>>>> > > >>>>> [Other: 5.5 ms] > > >>>>> > > >>>>> [Choose CSet: 0.0 ms] > > >>>>> > > >>>>> [Ref Proc: 2.2 ms] > > >>>>> > > >>>>> [Ref Enq: 0.0 ms] > > >>>>> > > >>>>> [Redirty Cards: 2.8 ms] > > >>>>> > > >>>>> [Humongous Register: 0.1 ms] > > >>>>> > > >>>>> [Humongous Reclaim: 0.0 ms] > > >>>>> > > >>>>> [Free CSet: 0.1 ms] > > >>>>> > > >>>>> [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K > Heap: > > >>>>> 265.9M(1024.0M)->218.4M(1024.0M)] > > >>>>> > > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs] > > >>>>> > > >>>>> > > >>>>> so when kafka stream running, is there any trying to rebalance? > > >> either > > >>>>> broker rebalance or client rebalance? > > >>>>> any kind of test to see what cause the trouble? > > >>>>> > > >>>>> Thanks, > > >>>>> Nan > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com > > > > >>>> wrote: > > >>>>> > > >>>>>> Okay, so you're measuring end-to-end time from producer -> broker > > >> -> > > >>>>>> streams' consumer client, there are multiple phases that can > > >>> contribute > > >>>>> to > > >>>>>> the 100ms latency, and I cannot tell if stream's consumer phase is > > >>> the > > >>>>>> major contributor. For example, if the topic was not created > > >> before, > > >>>> then > > >>>>>> when the broker first received a produce request it may need to > > >>> create > > >>>>> the > > >>>>>> topic, which involves multiple steps including writes to ZK which > > >>> could > > >>>>>> take time. > > >>>>>> > > >>>>>> There are some confusions from your description: you mentioned > > >> "Kafka > > >>>>>> cluster is already up and running", but I think you are referring > > >> to > > >>>>> "Kafka > > >>>>>> Streams application instances are already up and running", right? > > >>> Since > > >>>>>> only the latter has rebalance process, while the Kafak brokers do > > >> not > > >>>>>> really have "rebalances" except balancing load by migrating > > >>> partitions. > > >>>>>> > > >>>>>> Guozhang > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com> > > >> wrote: > > >>>>>> > > >>>>>>> right, so my kafka cluster is already up and running for a while, > > >>>> and I > > >>>>>> can > > >>>>>>> see from the log all broker instance already change from > > >> rebalance > > >>> to > > >>>>>>> running. > > >>>>>>> > > >>>>>>> I did a another test. > > >>>>>>> from producer, right before the message get send to the broker, I > > >>>> put a > > >>>>>>> timestamp in the message. and from the consumer side which is > > >> after > > >>>>>> stream > > >>>>>>> processing, I compare this timestamp with current time. I can see > > >>>> some > > >>>>>>> message processing time is above 100ms on some real powerful > > >>>> hardware. > > >>>>>> and > > >>>>>>> from my application gc, all the gc time is below 1ms, kafka gc > > >> only > > >>>>>> happen > > >>>>>>> once and below 1ms too. > > >>>>>>> > > >>>>>>> very puzzled. is there any communication to zookeeper, if not get > > >>>>>> response, > > >>>>>>> will cause the broker to pause? I don't think that's the case but > > >>> at > > >>>>> this > > >>>>>>> time don't know what else can be suspected. > > >>>>>>> > > >>>>>>> Nan > > >>>>>>> > > >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang < > > >> wangg...@gmail.com> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hello Nan, > > >>>>>>>> > > >>>>>>>> Note that Streams may need some time to rebalance and assign > > >>> tasks > > >>>>> even > > >>>>>>> if > > >>>>>>>> you only starts with one instance. > > >>>>>>>> > > >>>>>>>> I'd suggest you register your state listener in Kafka Streams > > >> via > > >>>>>>>> KafkaStreams#setStateListener, and your customized > > >> StateListener > > >>>>> should > > >>>>>>>> record when the state transits from REBALANCING to RUNNING > > >> since > > >>>> only > > >>>>>>> after > > >>>>>>>> that the streams client will start to process the first record. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Guozhang > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com> > > >>>> wrote: > > >>>>>>>> > > >>>>>>>>> thanks, which JMX properties indicate "processing latency > > >>>>> spikes" / > > >>>>>>>>> "throughput" > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax < > > >>>>>> matth...@confluent.io > > >>>>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> I cannot spot any obvious reasons. > > >>>>>>>>>> > > >>>>>>>>>> As you consume from the result topic for verification, we > > >>>> should > > >>>>>>> verify > > >>>>>>>>>> that the latency spikes original on write and not on read: > > >>> you > > >>>>>> might > > >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if > > >>>>>>> processing > > >>>>>>>>>> latency spikes or throughput drops. > > >>>>>>>>>> > > >>>>>>>>>> Also watch for GC pauses in the JVM. > > >>>>>>>>>> > > >>>>>>>>>> Hope this helps. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -Matthias > > >>>>>>>>>> > > >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote: > > >>>>>>>>>>> btw, I am using version 0.10.2.0 > > >>>>>>>>>>> > > >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu < > > >>> nanxu1...@gmail.com> > > >>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> I am working on a kafka stream app, and see huge latency > > >>>>>> variance, > > >>>>>>>>>>>> wondering what can cause this? > > >>>>>>>>>>>> > > >>>>>>>>>>>> the processing is very simple and don't have state, > > >>>> linger.ms > > >>>>>>>> already > > >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and > > >>>>> published > > >>>>>>> as > > >>>>>>>>> 2000 > > >>>>>>>>>>>> messages/s, network is 10G. using a regular consumer > > >>> watch > > >>>>> the > > >>>>>>>>>>>> localHistTopic topic and just every 2000 message print > > >>> out > > >>>> a > > >>>>>>>> counter, > > >>>>>>>>>> it > > >>>>>>>>>>>> usually every second I get a count 2000 as the publish > > >>>> speed, > > >>>>>> but > > >>>>>>>>>> sometime > > >>>>>>>>>>>> I see it stall for 3 or more seconds and then print out > > >> a > > >>>> few > > >>>>>>> count. > > >>>>>>>>>> like > > >>>>>>>>>>>> cpu is paused during that time or message being > > >>> cache/batch > > >>>>> then > > >>>>>>>>>> processed. > > >>>>>>>>>>>> any suggestion? > > >>>>>>>>>>>> > > >>>>>>>>>>>> final Properties streamsConfiguration = new > > >>> Properties(); > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > > >>>>>>>>>>>> applicationId); > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_ > > >>>>> CONFIG, > > >>>>>>>>>> clientId); > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_ > > >>>>>>>>> SERVERS_CONFIG, > > >>>>>>>>>>>> bootstrapServers); > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_ > > >>>>>>>>> SERDE_CLASS_CONFIG, > > >>>>>>>>>>>> Serdes.String() > > >>>>>>>>>>>> > > >>>>>>>>>>>> .getClass().getName()); > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ > > >>>>> MS_CONFIG, > > >>>>>>>>>>>> 10 * 1000); > > >>>>>>>>>>>> > > >>>>>>>>>>>> // > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>> > > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > > BUFFERING_CONFIG, > > >>>>>>>>> 0); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>>>>>>>>>>> > > >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>>>>>>>>>>> > > >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, > > >>>>> 335544320); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>>>>>>>>>>> > > >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ > > >>>>>>> REQUESTS_PER_CONNECTION, > > >>>>>>>>> 30); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>>>>>>>>>>> > > >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.consumerPrefix( > > >>>>>>>>>>>> ConsumerConfig.MAX_PARTITION_ > > >>>>> FETCH_BYTES_CONFIG),20 > > >>>>>> * > > >>>>>>>>> 1024 * > > >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000); > > >>>>>>>>>>>> > > >>>>>>>>>>>> // > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>> > > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > > BUFFERING_CONFIG, > > >>>>>>>>> 0); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>>>>>>>>>>> > > >>>>>>>>>>>> + ProducerConfig.BATCH_SIZE_CONFIG,163840); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>>>>>>>>>>> > > >>>>>>>>>>>> + ProducerConfig.BUFFER_MEMORY_CONFIG, > > >>>>> 335544320); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>>>>>>>>>>> > > >>>>>>>>>>>> + ProducerConfig.MAX_IN_FLIGHT_ > > >>>>>>> REQUESTS_PER_CONNECTION, > > >>>>>>>>> 30); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.PRODUCER_PREFIX > > >>>>>>>>>>>> > > >>>>>>>>>>>> + ProducerConfig.LINGER_MS_CONFIG,"5"); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streamsConfiguration.put( > > >>>>> StreamsConfig.consumerPrefix( > > >>>>>>>>>>>> > > >>>>>>>>>>>> ConsumerConfig. MAX_PARTITION_FETCH_BYTES_ > > >>> CONFIG > > >>>>> , > > >>>>>>> 20 * > > >>>>>>>>>> 1024 * > > >>>>>>>>>>>> 1024); > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder(); > > >>>>>>>>>>>> > > >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream = > > >>>>>>>>> builder.stream( > > >>>>>>>>>>>> > > >>>>>>>>>>>> localDeltaTopic, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Consumed.with( > > >>>>>>>>>>>> > > >>>>>>>>>>>> new Serdes.StringSerde(), > > >>>>>>>>>>>> > > >>>>>>>>>>>> new NodeMutationSerde<>() > > >>>>>>>>>>>> > > >>>>>>>>>>>> ) > > >>>>>>>>>>>> > > >>>>>>>>>>>> ); > > >>>>>>>>>>>> > > >>>>>>>>>>>> KStream<String, NodeState> localHistStream = > > >>>>>>>>>> localDeltaStream.mapValues( > > >>>>>>>>>>>> > > >>>>>>>>>>>> (mutation) -> NodeState > > >>>>>>>>>>>> > > >>>>>>>>>>>> .newBuilder() > > >>>>>>>>>>>> > > >>>>>>>>>>>> .setMeta( > > >>>>>>>>>>>> > > >>>>>>>>>>>> mutation.getMetaMutation().getMeta() > > >>>>>>>>>>>> > > >>>>>>>>>>>> ) > > >>>>>>>>>>>> > > >>>>>>>>>>>> .setValue( > > >>>>>>>>>>>> > > >>>>>>>>>>>> mutation.getValueMutation(). > > >>> getValue() > > >>>>>>>>>>>> > > >>>>>>>>>>>> ) > > >>>>>>>>>>>> > > >>>>>>>>>>>> .build() > > >>>>>>>>>>>> > > >>>>>>>>>>>> ); > > >>>>>>>>>>>> > > >>>>>>>>>>>> localHistStream.to( > > >>>>>>>>>>>> > > >>>>>>>>>>>> localHistTopic, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Produced.with(new Serdes.StringSerde(), new > > >>>>>>>>>> NodeStateSerde<>()) > > >>>>>>>>>>>> > > >>>>>>>>>>>> ); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streams = new KafkaStreams(builder.build(), > > >>>>>>> streamsConfiguration); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streams.cleanUp(); > > >>>>>>>>>>>> > > >>>>>>>>>>>> streams.start(); > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -- > > >>>>>>>> -- Guozhang > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> -- Guozhang > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >>>> > > >>>> -- > > >>>> -- Guozhang > > >>>> > > >>> > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > -- > -- Guozhang >