I will wait for the expert’s opinion:

Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s a 
Linux kernel parameter.

-Sudhir

> On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> 
> I think I found where the problem is, how to solve and why, still not sure.
> 
> it related to disk (maybe flushing?). I did a single machine, single node,
> single topic and single partition setup.  producer pub as 2000 message/s,
> 10K size message size. and single key.
> 
> when I save kafka log to the  memory based partition, I don't see a latency
> over 100ms. top around 70ms.
> when I save to a ssd hard drive. I do see latency spike, sometime over 1s.
> 
> adjust the log.flush.inteval.message / log.flush.intefval.ms has impact,
> but only to make thing worse... need suggestion.
> 
> I think log flushing is totally async and done by OS in the default
> setting. does kafka has to wait when flushing data to disk?
> 
> Thanks,
> Nan
> 
> 
> 
>> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com> wrote:
>> 
>> Given your application code:
>> 
>> ----------------------------
>> 
>> final KStream<String, NodeMutation> localDeltaStream = builder.stream(
>> 
>>            localDeltaTopic,
>> 
>>            Consumed.with(
>> 
>>                new Serdes.StringSerde(),
>> 
>>                new NodeMutationSerde<>()
>> 
>>            )
>> 
>>        );
>> 
>>  KStream<String, NodeState> localHistStream = localDeltaStream.mapValues(
>> 
>>            (mutation) -> NodeState
>> 
>>                .newBuilder()
>> 
>>                .setMeta(
>> 
>>                    mutation.getMetaMutation().getMeta()
>> 
>>                )
>> 
>>                .setValue(
>> 
>>                    mutation.getValueMutation().getValue()
>> 
>>                )
>> 
>>                .build()
>> 
>>        );
>> 
>>  localHistStream.to(
>> 
>>            localHistTopic,
>> 
>>            Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>())
>> 
>>        );
>> 
>> ----------------------------
>> 
>> which is pure stateless, committing will not touch on an state directory at
>> all. Hence committing only involves committing offsets to Kafka.
>> 
>> 
>> Guozhang
>> 
>> 
>>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote:
>>> 
>>> I was suspecting that too, but I also noticed the spike is not spaced
>>> around 10s. to further prove it. I put kafka data directory in a memory
>>> based directory.  it still has such latency spikes.  I am going to test
>> it
>>> on a single broker, single partition env.  will report back soon.
>>> 
>>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>> 
>>>> Hello Nan,
>>>> 
>>>> Thanks for the detailed information you shared. When Kafka Streams is
>>>> normally running, no rebalances should be triggered unless some of the
>>>> instances (in your case, docker containers) have soft failures.
>>>> 
>>>> I suspect the latency spike is due to the commit intervals: streams
>> will
>>>> try to commit its offset at a regular paces, which may increase
>> latency.
>>> It
>>>> is controlled by the "commit.interval.ms" config value. I saw that in
>>> your
>>>> original email you've set it to 10 * 1000 (10 seconds). Is that aligned
>>>> with the frequency you observe latency spikes?
>>>> 
>>>> 
>>>> Guozhang
>>>> 
>>>> 
>>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> wrote:
>>>>> 
>>>>> did more test and and make the test case simple.
>>>>> all the setup now is a single physical machine. running 3 docker
>>>> instance.
>>>>> a1, a2, a3
>>>>> 
>>>>> kafka + zookeeper running on all of those docker containers.
>>>>> producer running on a1, send a single key,  update speed 2000
>>> message/s,
>>>>> each message is 10K size.
>>>>> 3 consumer(different group)  are running. one on each docker.
>>>>> all topics are pre-created.
>>>>> in startup, I do see some latency greater than 100ms, which is fine.
>>> and
>>>>> then everything is good. latency is low and consumer don't see
>> anything
>>>>> over 100ms for a while.
>>>>> then I see a few messages have latency over 100ms. then back to
>> normal,
>>>>> then happen again..... do seems like gc problem. but I check the gc
>>>> log.  I
>>>>> don't think it can cause over 100ms. (both are G1 collector)
>>>>> 
>>>>> after the stream stable running( exclude the startup), the first
>>> message
>>>>> over 100ms take 179ms  and the gc ( it has a 30ms pause, but should
>> not
>>>>> cause a 179ms end to end).
>>>>> 
>>>>> FROM APP
>>>>> 
>>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
>>>>> 3184739K->84018K(5947904K), 0.0093730 secs]
>>>>> 
>>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
>>>>> 3184690K->84280K(6053888K), 0.0087473 secs]
>>>>> 
>>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
>>>>> 3301176K->84342K(6061056K), 0.0127339 secs]
>>>>> 
>>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
>>>>> 3301238K->84624K(6143488K), 0.0140844 secs]
>>>>> 
>>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
>>>>> 3386000K->89949K(6144000K), 0.0108118 secs]
>>>>> 
>>>>> 
>>>>> 
>>>>> kafka a1
>>>>> 
>>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation
>> Pause)
>>>>> (young), 0.0214200 secs]
>>>>> 
>>>>>   [Parallel Time: 17.2 ms, GC Workers: 8]
>>>>> 
>>>>>      [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
>>>>> 7982673.8, Diff: 16.3]
>>>>> 
>>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff:
>> 1.5,
>>>>> Sum: 1.5]
>>>>> 
>>>>>      [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum:
>>> 8.4]
>>>>> 
>>>>>         [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13,
>> Sum:
>>>> 37]
>>>>> 
>>>>>      [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum:
>> 7.1]
>>>>> 
>>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>>> 0.0,
>>>>> Sum: 0.0]
>>>>> 
>>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5,
>> Sum:
>>>>> 36.5]
>>>>> 
>>>>>      [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9,
>> Sum:
>>>> 2.9]
>>>>> 
>>>>>         [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24,
>>>> Sum:
>>>>> 83]
>>>>> 
>>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
>>>> Sum:
>>>>> 0.1]
>>>>> 
>>>>>      [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff:
>> 16.2,
>>>>> Sum: 56.5]
>>>>> 
>>>>>      [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max:
>>>> 7982674.5,
>>>>> Diff: 0.6]
>>>>> 
>>>>>   [Code Root Fixup: 0.0 ms]
>>>>> 
>>>>>   [Code Root Purge: 0.0 ms]
>>>>> 
>>>>>   [Clear CT: 1.0 ms]
>>>>> 
>>>>>   [Other: 3.2 ms]
>>>>> 
>>>>>      [Choose CSet: 0.0 ms]
>>>>> 
>>>>>      [Ref Proc: 1.9 ms]
>>>>> 
>>>>>      [Ref Enq: 0.0 ms]
>>>>> 
>>>>>      [Redirty Cards: 0.8 ms]
>>>>> 
>>>>>      [Humongous Register: 0.1 ms]
>>>>> 
>>>>>      [Humongous Reclaim: 0.0 ms]
>>>>> 
>>>>>      [Free CSet: 0.2 ms]
>>>>> 
>>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
>>>>> 265.5M(1024.0M)->217.9M(1024.0M)]
>>>>> 
>>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
>>>>> 
>>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation
>> Pause)
>>>>> (young), 0.0310004 secs]
>>>>> 
>>>>>   [Parallel Time: 24.4 ms, GC Workers: 8]
>>>>> 
>>>>>      [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max:
>>>>> 7984444.7, Diff: 18.6]
>>>>> 
>>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff:
>> 1.9,
>>>>> Sum: 2.0]
>>>>> 
>>>>>      [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8,
>> Sum:
>>>>> 32.9]
>>>>> 
>>>>>         [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25,
>> Sum:
>>>> 43]
>>>>> 
>>>>>      [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum:
>>>> 25.5]
>>>>> 
>>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
>>> 0.0,
>>>>> Sum: 0.0]
>>>>> 
>>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9,
>> Sum:
>>>>> 32.7]
>>>>> 
>>>>>      [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6,
>> Sum:
>>>> 6.8]
>>>>> 
>>>>>         [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10,
>>> Sum:
>>>>> 43]
>>>>> 
>>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
>>>> Sum:
>>>>> 0.1]
>>>>> 
>>>>>      [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff:
>>> 19.1,
>>>>> Sum: 100.1]
>>>>> 
>>>>>      [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max:
>>>> 7984449.9,
>>>>> Diff: 0.8]
>>>>> 
>>>>>   [Code Root Fixup: 0.0 ms]
>>>>> 
>>>>>   [Code Root Purge: 0.0 ms]
>>>>> 
>>>>>   [Clear CT: 1.1 ms]
>>>>> 
>>>>>   [Other: 5.5 ms]
>>>>> 
>>>>>      [Choose CSet: 0.0 ms]
>>>>> 
>>>>>      [Ref Proc: 2.2 ms]
>>>>> 
>>>>>      [Ref Enq: 0.0 ms]
>>>>> 
>>>>>      [Redirty Cards: 2.8 ms]
>>>>> 
>>>>>      [Humongous Register: 0.1 ms]
>>>>> 
>>>>>      [Humongous Reclaim: 0.0 ms]
>>>>> 
>>>>>      [Free CSet: 0.1 ms]
>>>>> 
>>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
>>>>> 265.9M(1024.0M)->218.4M(1024.0M)]
>>>>> 
>>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
>>>>> 
>>>>> 
>>>>> so when kafka stream running, is there any trying to rebalance?
>> either
>>>>> broker rebalance or client rebalance?
>>>>> any kind of test to see what cause the trouble?
>>>>> 
>>>>> Thanks,
>>>>> Nan
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> Okay, so you're measuring end-to-end time from producer -> broker
>> ->
>>>>>> streams' consumer client, there are multiple phases that can
>>> contribute
>>>>> to
>>>>>> the 100ms latency, and I cannot tell if stream's consumer phase is
>>> the
>>>>>> major contributor. For example, if the topic was not created
>> before,
>>>> then
>>>>>> when the broker first received a produce request it may need to
>>> create
>>>>> the
>>>>>> topic, which involves multiple steps including writes to ZK which
>>> could
>>>>>> take time.
>>>>>> 
>>>>>> There are some confusions from your description: you mentioned
>> "Kafka
>>>>>> cluster is already up and running", but I think you are referring
>> to
>>>>> "Kafka
>>>>>> Streams application instances are already up and running", right?
>>> Since
>>>>>> only the latter has rebalance process, while the Kafak brokers do
>> not
>>>>>> really have "rebalances" except balancing load by migrating
>>> partitions.
>>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com>
>> wrote:
>>>>>> 
>>>>>>> right, so my kafka cluster is already up and running for a while,
>>>> and I
>>>>>> can
>>>>>>> see from the log all broker instance already change from
>> rebalance
>>> to
>>>>>>> running.
>>>>>>> 
>>>>>>> I did a another test.
>>>>>>> from producer, right before the message get send to the broker, I
>>>> put a
>>>>>>> timestamp in the message. and from the consumer side which is
>> after
>>>>>> stream
>>>>>>> processing, I compare this timestamp with current time. I can see
>>>> some
>>>>>>> message processing time is above 100ms on some real powerful
>>>> hardware.
>>>>>> and
>>>>>>> from my application gc, all the gc time is below 1ms, kafka gc
>> only
>>>>>> happen
>>>>>>> once and below 1ms too.
>>>>>>> 
>>>>>>> very puzzled. is there any communication to zookeeper, if not get
>>>>>> response,
>>>>>>> will cause the broker to pause? I don't think that's the case but
>>> at
>>>>> this
>>>>>>> time don't know what else can be suspected.
>>>>>>> 
>>>>>>> Nan
>>>>>>> 
>>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <
>> wangg...@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hello Nan,
>>>>>>>> 
>>>>>>>> Note that Streams may need some time to rebalance and assign
>>> tasks
>>>>> even
>>>>>>> if
>>>>>>>> you only starts with one instance.
>>>>>>>> 
>>>>>>>> I'd suggest you register your state listener in Kafka Streams
>> via
>>>>>>>> KafkaStreams#setStateListener, and your customized
>> StateListener
>>>>> should
>>>>>>>> record when the state transits from REBALANCING to RUNNING
>> since
>>>> only
>>>>>>> after
>>>>>>>> that the streams client will start to process the first record.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Guozhang
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com>
>>>> wrote:
>>>>>>>> 
>>>>>>>>> thanks, which JMX properties indicate  "processing latency
>>>>> spikes"  /
>>>>>>>>> "throughput"
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
>>>>>> matth...@confluent.io
>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> I cannot spot any obvious reasons.
>>>>>>>>>> 
>>>>>>>>>> As you consume from the result topic for verification, we
>>>> should
>>>>>>> verify
>>>>>>>>>> that the latency spikes original on write and not on read:
>>> you
>>>>>> might
>>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if
>>>>>>> processing
>>>>>>>>>> latency spikes or throughput drops.
>>>>>>>>>> 
>>>>>>>>>> Also watch for GC pauses in the JVM.
>>>>>>>>>> 
>>>>>>>>>> Hope this helps.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote:
>>>>>>>>>>> btw, I am using version 0.10.2.0
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <
>>> nanxu1...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> I am working on a kafka stream app, and see huge latency
>>>>>> variance,
>>>>>>>>>>>> wondering what can cause this?
>>>>>>>>>>>> 
>>>>>>>>>>>> the processing is very simple and don't have state,
>>>> linger.ms
>>>>>>>> already
>>>>>>>>>>>> change to 5ms. the message size is around 10K byes and
>>>>> published
>>>>>>> as
>>>>>>>>> 2000
>>>>>>>>>>>> messages/s, network is 10G.  using a regular consumer
>>> watch
>>>>> the
>>>>>>>>>>>> localHistTopic  topic and just every 2000 message print
>>> out
>>>> a
>>>>>>>> counter,
>>>>>>>>>> it
>>>>>>>>>>>> usually every second I get a count 2000 as the publish
>>>> speed,
>>>>>> but
>>>>>>>>>> sometime
>>>>>>>>>>>> I see it stall for 3 or more seconds and then print out
>> a
>>>> few
>>>>>>> count.
>>>>>>>>>> like
>>>>>>>>>>>> cpu is paused during that time or message being
>>> cache/batch
>>>>> then
>>>>>>>>>> processed.
>>>>>>>>>>>> any suggestion?
>>>>>>>>>>>> 
>>>>>>>>>>>>  final Properties streamsConfiguration = new
>>> Properties();
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>>>>>>>>> applicationId);
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>> streamsConfiguration.put(StreamsConfig.CLIENT_ID_
>>>>> CONFIG,
>>>>>>>>>> clientId);
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
>>>>>>>>> SERVERS_CONFIG,
>>>>>>>>>>>> bootstrapServers);
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
>>>>>>>>> SERDE_CLASS_CONFIG,
>>>>>>>>>>>> Serdes.String()
>>>>>>>>>>>> 
>>>>>>>>>>>>            .getClass().getName());
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>>>>> MS_CONFIG,
>>>>>>>>>>>> 10 * 1000);
>>>>>>>>>>>> 
>>>>>>>>>>>> //
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>>>>>>>>> 0);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.PRODUCER_PREFIX
>>>>>>>>>>>> 
>>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.PRODUCER_PREFIX
>>>>>>>>>>>> 
>>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
>>>>> 335544320);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.PRODUCER_PREFIX
>>>>>>>>>>>> 
>>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
>>>>>>> REQUESTS_PER_CONNECTION,
>>>>>>>>> 30);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.PRODUCER_PREFIX
>>>>>>>>>>>> 
>>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.consumerPrefix(
>>>>>>>>>>>>            ConsumerConfig.MAX_PARTITION_
>>>>> FETCH_BYTES_CONFIG),20
>>>>>> *
>>>>>>>>> 1024 *
>>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000);
>>>>>>>>>>>> 
>>>>>>>>>>>> //
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>>>>>>>>> 0);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.PRODUCER_PREFIX
>>>>>>>>>>>> 
>>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.PRODUCER_PREFIX
>>>>>>>>>>>> 
>>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
>>>>> 335544320);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.PRODUCER_PREFIX
>>>>>>>>>>>> 
>>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
>>>>>>> REQUESTS_PER_CONNECTION,
>>>>>>>>> 30);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.PRODUCER_PREFIX
>>>>>>>>>>>> 
>>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
>>>>>>>>>>>> 
>>>>>>>>>>>>        streamsConfiguration.put(
>>>>> StreamsConfig.consumerPrefix(
>>>>>>>>>>>> 
>>>>>>>>>>>>            ConsumerConfig. MAX_PARTITION_FETCH_BYTES_
>>> CONFIG
>>>>> ,
>>>>>>> 20 *
>>>>>>>>>> 1024 *
>>>>>>>>>>>> 1024);
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder();
>>>>>>>>>>>> 
>>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream =
>>>>>>>>> builder.stream(
>>>>>>>>>>>> 
>>>>>>>>>>>>            localDeltaTopic,
>>>>>>>>>>>> 
>>>>>>>>>>>>            Consumed.with(
>>>>>>>>>>>> 
>>>>>>>>>>>>                new Serdes.StringSerde(),
>>>>>>>>>>>> 
>>>>>>>>>>>>                new NodeMutationSerde<>()
>>>>>>>>>>>> 
>>>>>>>>>>>>            )
>>>>>>>>>>>> 
>>>>>>>>>>>>        );
>>>>>>>>>>>> 
>>>>>>>>>>>>  KStream<String, NodeState> localHistStream =
>>>>>>>>>> localDeltaStream.mapValues(
>>>>>>>>>>>> 
>>>>>>>>>>>>            (mutation) -> NodeState
>>>>>>>>>>>> 
>>>>>>>>>>>>                .newBuilder()
>>>>>>>>>>>> 
>>>>>>>>>>>>                .setMeta(
>>>>>>>>>>>> 
>>>>>>>>>>>>                    mutation.getMetaMutation().getMeta()
>>>>>>>>>>>> 
>>>>>>>>>>>>                )
>>>>>>>>>>>> 
>>>>>>>>>>>>                .setValue(
>>>>>>>>>>>> 
>>>>>>>>>>>>                    mutation.getValueMutation().
>>> getValue()
>>>>>>>>>>>> 
>>>>>>>>>>>>                )
>>>>>>>>>>>> 
>>>>>>>>>>>>                .build()
>>>>>>>>>>>> 
>>>>>>>>>>>>        );
>>>>>>>>>>>> 
>>>>>>>>>>>>  localHistStream.to(
>>>>>>>>>>>> 
>>>>>>>>>>>>            localHistTopic,
>>>>>>>>>>>> 
>>>>>>>>>>>>            Produced.with(new Serdes.StringSerde(), new
>>>>>>>>>> NodeStateSerde<>())
>>>>>>>>>>>> 
>>>>>>>>>>>>        );
>>>>>>>>>>>> 
>>>>>>>>>>>> streams = new KafkaStreams(builder.build(),
>>>>>>> streamsConfiguration);
>>>>>>>>>>>> 
>>>>>>>>>>>>        streams.cleanUp();
>>>>>>>>>>>> 
>>>>>>>>>>>> streams.start();
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> -- Guozhang
>>>> 
>>> 
>> 
>> 
>> 
>> --
>> -- Guozhang
>> 

Reply via email to