Looks really promising but after upgrade, still show the same result. I
will post the program soon. Maybe you can see where the problem could be.

Nan

On Thu, Aug 23, 2018, 7:34 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Nan,
>
> Kafka does not tie up the processing thread to do disk flushing. However,
> since you are on an older version of Kafka I suspect you're bumping into
> some old issues that have been resolved in later versions. e.g.
>
> https://issues.apache.org/jira/browse/KAFKA-4614
>
> I'd suggest you upgrading to latest version (2.0.0) and try again to see if
> you observe the same pattern.
>
> Guozhang
>
> On Thu, Aug 23, 2018 at 3:22 PM, Sudhir Babu Pothineni <
> sbpothin...@gmail.com> wrote:
>
> > I will wait for the expert’s opinion:
> >
> > Did the Transparent Huge Pages(THP) disabled on the broker machine? it’s
> a
> > Linux kernel parameter.
> >
> > -Sudhir
> >
> > > On Aug 23, 2018, at 4:46 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> > >
> > > I think I found where the problem is, how to solve and why, still not
> > sure.
> > >
> > > it related to disk (maybe flushing?). I did a single machine, single
> > node,
> > > single topic and single partition setup.  producer pub as 2000
> message/s,
> > > 10K size message size. and single key.
> > >
> > > when I save kafka log to the  memory based partition, I don't see a
> > latency
> > > over 100ms. top around 70ms.
> > > when I save to a ssd hard drive. I do see latency spike, sometime over
> > 1s.
> > >
> > > adjust the log.flush.inteval.message / log.flush.intefval.ms has
> impact,
> > > but only to make thing worse... need suggestion.
> > >
> > > I think log flushing is totally async and done by OS in the default
> > > setting. does kafka has to wait when flushing data to disk?
> > >
> > > Thanks,
> > > Nan
> > >
> > >
> > >
> > >> On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >>
> > >> Given your application code:
> > >>
> > >> ----------------------------
> > >>
> > >> final KStream<String, NodeMutation> localDeltaStream = builder.stream(
> > >>
> > >>            localDeltaTopic,
> > >>
> > >>            Consumed.with(
> > >>
> > >>                new Serdes.StringSerde(),
> > >>
> > >>                new NodeMutationSerde<>()
> > >>
> > >>            )
> > >>
> > >>        );
> > >>
> > >>  KStream<String, NodeState> localHistStream =
> > localDeltaStream.mapValues(
> > >>
> > >>            (mutation) -> NodeState
> > >>
> > >>                .newBuilder()
> > >>
> > >>                .setMeta(
> > >>
> > >>                    mutation.getMetaMutation().getMeta()
> > >>
> > >>                )
> > >>
> > >>                .setValue(
> > >>
> > >>                    mutation.getValueMutation().getValue()
> > >>
> > >>                )
> > >>
> > >>                .build()
> > >>
> > >>        );
> > >>
> > >>  localHistStream.to(
> > >>
> > >>            localHistTopic,
> > >>
> > >>            Produced.with(new Serdes.StringSerde(), new
> > NodeStateSerde<>())
> > >>
> > >>        );
> > >>
> > >> ----------------------------
> > >>
> > >> which is pure stateless, committing will not touch on an state
> > directory at
> > >> all. Hence committing only involves committing offsets to Kafka.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >>> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> > >>>
> > >>> I was suspecting that too, but I also noticed the spike is not spaced
> > >>> around 10s. to further prove it. I put kafka data directory in a
> memory
> > >>> based directory.  it still has such latency spikes.  I am going to
> test
> > >> it
> > >>> on a single broker, single partition env.  will report back soon.
> > >>>
> > >>> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>>
> > >>>> Hello Nan,
> > >>>>
> > >>>> Thanks for the detailed information you shared. When Kafka Streams
> is
> > >>>> normally running, no rebalances should be triggered unless some of
> the
> > >>>> instances (in your case, docker containers) have soft failures.
> > >>>>
> > >>>> I suspect the latency spike is due to the commit intervals: streams
> > >> will
> > >>>> try to commit its offset at a regular paces, which may increase
> > >> latency.
> > >>> It
> > >>>> is controlled by the "commit.interval.ms" config value. I saw that
> in
> > >>> your
> > >>>> original email you've set it to 10 * 1000 (10 seconds). Is that
> > aligned
> > >>>> with the frequency you observe latency spikes?
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>>> On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com>
> > wrote:
> > >>>>>
> > >>>>> did more test and and make the test case simple.
> > >>>>> all the setup now is a single physical machine. running 3 docker
> > >>>> instance.
> > >>>>> a1, a2, a3
> > >>>>>
> > >>>>> kafka + zookeeper running on all of those docker containers.
> > >>>>> producer running on a1, send a single key,  update speed 2000
> > >>> message/s,
> > >>>>> each message is 10K size.
> > >>>>> 3 consumer(different group)  are running. one on each docker.
> > >>>>> all topics are pre-created.
> > >>>>> in startup, I do see some latency greater than 100ms, which is
> fine.
> > >>> and
> > >>>>> then everything is good. latency is low and consumer don't see
> > >> anything
> > >>>>> over 100ms for a while.
> > >>>>> then I see a few messages have latency over 100ms. then back to
> > >> normal,
> > >>>>> then happen again..... do seems like gc problem. but I check the gc
> > >>>> log.  I
> > >>>>> don't think it can cause over 100ms. (both are G1 collector)
> > >>>>>
> > >>>>> after the stream stable running( exclude the startup), the first
> > >>> message
> > >>>>> over 100ms take 179ms  and the gc ( it has a 30ms pause, but should
> > >> not
> > >>>>> cause a 179ms end to end).
> > >>>>>
> > >>>>> FROM APP
> > >>>>>
> > >>>>> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
> > >>>>> 3184739K->84018K(5947904K), 0.0093730 secs]
> > >>>>>
> > >>>>> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
> > >>>>> 3184690K->84280K(6053888K), 0.0087473 secs]
> > >>>>>
> > >>>>> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
> > >>>>> 3301176K->84342K(6061056K), 0.0127339 secs]
> > >>>>>
> > >>>>> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
> > >>>>> 3301238K->84624K(6143488K), 0.0140844 secs]
> > >>>>>
> > >>>>> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
> > >>>>> 3386000K->89949K(6144000K), 0.0108118 secs]
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> kafka a1
> > >>>>>
> > >>>>> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation
> > >> Pause)
> > >>>>> (young), 0.0214200 secs]
> > >>>>>
> > >>>>>   [Parallel Time: 17.2 ms, GC Workers: 8]
> > >>>>>
> > >>>>>      [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
> > >>>>> 7982673.8, Diff: 16.3]
> > >>>>>
> > >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff:
> > >> 1.5,
> > >>>>> Sum: 1.5]
> > >>>>>
> > >>>>>      [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum:
> > >>> 8.4]
> > >>>>>
> > >>>>>         [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13,
> > >> Sum:
> > >>>> 37]
> > >>>>>
> > >>>>>      [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum:
> > >> 7.1]
> > >>>>>
> > >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> > >>> 0.0,
> > >>>>> Sum: 0.0]
> > >>>>>
> > >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5,
> > >> Sum:
> > >>>>> 36.5]
> > >>>>>
> > >>>>>      [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9,
> > >> Sum:
> > >>>> 2.9]
> > >>>>>
> > >>>>>         [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff:
> 24,
> > >>>> Sum:
> > >>>>> 83]
> > >>>>>
> > >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> 0.0,
> > >>>> Sum:
> > >>>>> 0.1]
> > >>>>>
> > >>>>>      [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff:
> > >> 16.2,
> > >>>>> Sum: 56.5]
> > >>>>>
> > >>>>>      [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max:
> > >>>> 7982674.5,
> > >>>>> Diff: 0.6]
> > >>>>>
> > >>>>>   [Code Root Fixup: 0.0 ms]
> > >>>>>
> > >>>>>   [Code Root Purge: 0.0 ms]
> > >>>>>
> > >>>>>   [Clear CT: 1.0 ms]
> > >>>>>
> > >>>>>   [Other: 3.2 ms]
> > >>>>>
> > >>>>>      [Choose CSet: 0.0 ms]
> > >>>>>
> > >>>>>      [Ref Proc: 1.9 ms]
> > >>>>>
> > >>>>>      [Ref Enq: 0.0 ms]
> > >>>>>
> > >>>>>      [Redirty Cards: 0.8 ms]
> > >>>>>
> > >>>>>      [Humongous Register: 0.1 ms]
> > >>>>>
> > >>>>>      [Humongous Reclaim: 0.0 ms]
> > >>>>>
> > >>>>>      [Free CSet: 0.2 ms]
> > >>>>>
> > >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K
> Heap:
> > >>>>> 265.5M(1024.0M)->217.9M(1024.0M)]
> > >>>>>
> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
> > >>>>>
> > >>>>> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation
> > >> Pause)
> > >>>>> (young), 0.0310004 secs]
> > >>>>>
> > >>>>>   [Parallel Time: 24.4 ms, GC Workers: 8]
> > >>>>>
> > >>>>>      [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max:
> > >>>>> 7984444.7, Diff: 18.6]
> > >>>>>
> > >>>>>      [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff:
> > >> 1.9,
> > >>>>> Sum: 2.0]
> > >>>>>
> > >>>>>      [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8,
> > >> Sum:
> > >>>>> 32.9]
> > >>>>>
> > >>>>>         [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25,
> > >> Sum:
> > >>>> 43]
> > >>>>>
> > >>>>>      [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum:
> > >>>> 25.5]
> > >>>>>
> > >>>>>      [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> > >>> 0.0,
> > >>>>> Sum: 0.0]
> > >>>>>
> > >>>>>      [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9,
> > >> Sum:
> > >>>>> 32.7]
> > >>>>>
> > >>>>>      [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6,
> > >> Sum:
> > >>>> 6.8]
> > >>>>>
> > >>>>>         [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10,
> > >>> Sum:
> > >>>>> 43]
> > >>>>>
> > >>>>>      [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> 0.0,
> > >>>> Sum:
> > >>>>> 0.1]
> > >>>>>
> > >>>>>      [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff:
> > >>> 19.1,
> > >>>>> Sum: 100.1]
> > >>>>>
> > >>>>>      [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max:
> > >>>> 7984449.9,
> > >>>>> Diff: 0.8]
> > >>>>>
> > >>>>>   [Code Root Fixup: 0.0 ms]
> > >>>>>
> > >>>>>   [Code Root Purge: 0.0 ms]
> > >>>>>
> > >>>>>   [Clear CT: 1.1 ms]
> > >>>>>
> > >>>>>   [Other: 5.5 ms]
> > >>>>>
> > >>>>>      [Choose CSet: 0.0 ms]
> > >>>>>
> > >>>>>      [Ref Proc: 2.2 ms]
> > >>>>>
> > >>>>>      [Ref Enq: 0.0 ms]
> > >>>>>
> > >>>>>      [Redirty Cards: 2.8 ms]
> > >>>>>
> > >>>>>      [Humongous Register: 0.1 ms]
> > >>>>>
> > >>>>>      [Humongous Reclaim: 0.0 ms]
> > >>>>>
> > >>>>>      [Free CSet: 0.1 ms]
> > >>>>>
> > >>>>>   [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K
> Heap:
> > >>>>> 265.9M(1024.0M)->218.4M(1024.0M)]
> > >>>>>
> > >>>>> [Times: user=0.05 sys=0.00, real=0.03 secs]
> > >>>>>
> > >>>>>
> > >>>>> so when kafka stream running, is there any trying to rebalance?
> > >> either
> > >>>>> broker rebalance or client rebalance?
> > >>>>> any kind of test to see what cause the trouble?
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Nan
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com
> >
> > >>>> wrote:
> > >>>>>
> > >>>>>> Okay, so you're measuring end-to-end time from producer -> broker
> > >> ->
> > >>>>>> streams' consumer client, there are multiple phases that can
> > >>> contribute
> > >>>>> to
> > >>>>>> the 100ms latency, and I cannot tell if stream's consumer phase is
> > >>> the
> > >>>>>> major contributor. For example, if the topic was not created
> > >> before,
> > >>>> then
> > >>>>>> when the broker first received a produce request it may need to
> > >>> create
> > >>>>> the
> > >>>>>> topic, which involves multiple steps including writes to ZK which
> > >>> could
> > >>>>>> take time.
> > >>>>>>
> > >>>>>> There are some confusions from your description: you mentioned
> > >> "Kafka
> > >>>>>> cluster is already up and running", but I think you are referring
> > >> to
> > >>>>> "Kafka
> > >>>>>> Streams application instances are already up and running", right?
> > >>> Since
> > >>>>>> only the latter has rebalance process, while the Kafak brokers do
> > >> not
> > >>>>>> really have "rebalances" except balancing load by migrating
> > >>> partitions.
> > >>>>>>
> > >>>>>> Guozhang
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com>
> > >> wrote:
> > >>>>>>
> > >>>>>>> right, so my kafka cluster is already up and running for a while,
> > >>>> and I
> > >>>>>> can
> > >>>>>>> see from the log all broker instance already change from
> > >> rebalance
> > >>> to
> > >>>>>>> running.
> > >>>>>>>
> > >>>>>>> I did a another test.
> > >>>>>>> from producer, right before the message get send to the broker, I
> > >>>> put a
> > >>>>>>> timestamp in the message. and from the consumer side which is
> > >> after
> > >>>>>> stream
> > >>>>>>> processing, I compare this timestamp with current time. I can see
> > >>>> some
> > >>>>>>> message processing time is above 100ms on some real powerful
> > >>>> hardware.
> > >>>>>> and
> > >>>>>>> from my application gc, all the gc time is below 1ms, kafka gc
> > >> only
> > >>>>>> happen
> > >>>>>>> once and below 1ms too.
> > >>>>>>>
> > >>>>>>> very puzzled. is there any communication to zookeeper, if not get
> > >>>>>> response,
> > >>>>>>> will cause the broker to pause? I don't think that's the case but
> > >>> at
> > >>>>> this
> > >>>>>>> time don't know what else can be suspected.
> > >>>>>>>
> > >>>>>>> Nan
> > >>>>>>>
> > >>>>>>> On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <
> > >> wangg...@gmail.com>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hello Nan,
> > >>>>>>>>
> > >>>>>>>> Note that Streams may need some time to rebalance and assign
> > >>> tasks
> > >>>>> even
> > >>>>>>> if
> > >>>>>>>> you only starts with one instance.
> > >>>>>>>>
> > >>>>>>>> I'd suggest you register your state listener in Kafka Streams
> > >> via
> > >>>>>>>> KafkaStreams#setStateListener, and your customized
> > >> StateListener
> > >>>>> should
> > >>>>>>>> record when the state transits from REBALANCING to RUNNING
> > >> since
> > >>>> only
> > >>>>>>> after
> > >>>>>>>> that the streams client will start to process the first record.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com>
> > >>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> thanks, which JMX properties indicate  "processing latency
> > >>>>> spikes"  /
> > >>>>>>>>> "throughput"
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
> > >>>>>> matth...@confluent.io
> > >>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> I cannot spot any obvious reasons.
> > >>>>>>>>>>
> > >>>>>>>>>> As you consume from the result topic for verification, we
> > >>>> should
> > >>>>>>> verify
> > >>>>>>>>>> that the latency spikes original on write and not on read:
> > >>> you
> > >>>>>> might
> > >>>>>>>>>> want to have a look into Kafka Streams JMX metric to see if
> > >>>>>>> processing
> > >>>>>>>>>> latency spikes or throughput drops.
> > >>>>>>>>>>
> > >>>>>>>>>> Also watch for GC pauses in the JVM.
> > >>>>>>>>>>
> > >>>>>>>>>> Hope this helps.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> -Matthias
> > >>>>>>>>>>
> > >>>>>>>>>>> On 8/17/18 12:13 PM, Nan Xu wrote:
> > >>>>>>>>>>> btw, I am using version 0.10.2.0
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <
> > >>> nanxu1...@gmail.com>
> > >>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> I am working on a kafka stream app, and see huge latency
> > >>>>>> variance,
> > >>>>>>>>>>>> wondering what can cause this?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> the processing is very simple and don't have state,
> > >>>> linger.ms
> > >>>>>>>> already
> > >>>>>>>>>>>> change to 5ms. the message size is around 10K byes and
> > >>>>> published
> > >>>>>>> as
> > >>>>>>>>> 2000
> > >>>>>>>>>>>> messages/s, network is 10G.  using a regular consumer
> > >>> watch
> > >>>>> the
> > >>>>>>>>>>>> localHistTopic  topic and just every 2000 message print
> > >>> out
> > >>>> a
> > >>>>>>>> counter,
> > >>>>>>>>>> it
> > >>>>>>>>>>>> usually every second I get a count 2000 as the publish
> > >>>> speed,
> > >>>>>> but
> > >>>>>>>>>> sometime
> > >>>>>>>>>>>> I see it stall for 3 or more seconds and then print out
> > >> a
> > >>>> few
> > >>>>>>> count.
> > >>>>>>>>>> like
> > >>>>>>>>>>>> cpu is paused during that time or message being
> > >>> cache/batch
> > >>>>> then
> > >>>>>>>>>> processed.
> > >>>>>>>>>>>> any suggestion?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>  final Properties streamsConfiguration = new
> > >>> Properties();
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > >>>>>>>>>>>> applicationId);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >> streamsConfiguration.put(StreamsConfig.CLIENT_ID_
> > >>>>> CONFIG,
> > >>>>>>>>>> clientId);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> > >>>>>>>>> SERVERS_CONFIG,
> > >>>>>>>>>>>> bootstrapServers);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> > >>>>>>>>> SERDE_CLASS_CONFIG,
> > >>>>>>>>>>>> Serdes.String()
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            .getClass().getName());
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> > >>>>> MS_CONFIG,
> > >>>>>>>>>>>> 10 * 1000);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> //
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>
> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> > BUFFERING_CONFIG,
> > >>>>>>>>> 0);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
> > >>>>> 335544320);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
> > >>>>>>> REQUESTS_PER_CONNECTION,
> > >>>>>>>>> 30);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.consumerPrefix(
> > >>>>>>>>>>>>            ConsumerConfig.MAX_PARTITION_
> > >>>>> FETCH_BYTES_CONFIG),20
> > >>>>>> *
> > >>>>>>>>> 1024 *
> > >>>>>>>>>>>> 1024);MS_CONFIG, 10 * 1000);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> //
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>
> > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> > BUFFERING_CONFIG,
> > >>>>>>>>> 0);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            + ProducerConfig.BUFFER_MEMORY_CONFIG,
> > >>>>> 335544320);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            + ProducerConfig.MAX_IN_FLIGHT_
> > >>>>>>> REQUESTS_PER_CONNECTION,
> > >>>>>>>>> 30);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.PRODUCER_PREFIX
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            + ProducerConfig.LINGER_MS_CONFIG,"5");
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streamsConfiguration.put(
> > >>>>> StreamsConfig.consumerPrefix(
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            ConsumerConfig. MAX_PARTITION_FETCH_BYTES_
> > >>> CONFIG
> > >>>>> ,
> > >>>>>>> 20 *
> > >>>>>>>>>> 1024 *
> > >>>>>>>>>>>> 1024);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> final StreamsBuilder builder = new StreamsBuilder();
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> final KStream<String, NodeMutation> localDeltaStream =
> > >>>>>>>>> builder.stream(
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            localDeltaTopic,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            Consumed.with(
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                new Serdes.StringSerde(),
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                new NodeMutationSerde<>()
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            )
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        );
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>  KStream<String, NodeState> localHistStream =
> > >>>>>>>>>> localDeltaStream.mapValues(
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            (mutation) -> NodeState
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                .newBuilder()
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                .setMeta(
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                    mutation.getMetaMutation().getMeta()
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                )
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                .setValue(
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                    mutation.getValueMutation().
> > >>> getValue()
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                )
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>                .build()
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        );
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>  localHistStream.to(
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            localHistTopic,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>            Produced.with(new Serdes.StringSerde(), new
> > >>>>>>>>>> NodeStateSerde<>())
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        );
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> streams = new KafkaStreams(builder.build(),
> > >>>>>>> streamsConfiguration);
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>        streams.cleanUp();
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> streams.start();
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to