Hello Nan,

Thanks for the detailed information you shared. When Kafka Streams is
normally running, no rebalances should be triggered unless some of the
instances (in your case, docker containers) have soft failures.

I suspect the latency spike is due to the commit intervals: streams will
try to commit its offset at a regular paces, which may increase latency. It
is controlled by the "commit.interval.ms" config value. I saw that in your
original email you've set it to 10 * 1000 (10 seconds). Is that aligned
with the frequency you observe latency spikes?


Guozhang


On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> wrote:

> did more test and and make the test case simple.
> all the setup now is a single physical machine. running 3 docker instance.
> a1, a2, a3
>
> kafka + zookeeper running on all of those docker containers.
> producer running on a1, send a single key,  update speed 2000 message/s,
> each message is 10K size.
> 3 consumer(different group)  are running. one on each docker.
> all topics are pre-created.
> in startup, I do see some latency greater than 100ms, which is fine. and
> then everything is good. latency is low and consumer don't see anything
> over 100ms for a while.
> then I see a few messages have latency over 100ms. then back to normal,
> then happen again..... do seems like gc problem. but I check the gc log.  I
> don't think it can cause over 100ms. (both are G1 collector)
>
> after the stream stable running( exclude the startup), the first message
> over 100ms take 179ms  and the gc ( it has a 30ms pause, but should not
> cause a 179ms end to end).
>
> FROM APP
>
> 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
> 3184739K->84018K(5947904K), 0.0093730 secs]
>
> 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
> 3184690K->84280K(6053888K), 0.0087473 secs]
>
> 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
> 3301176K->84342K(6061056K), 0.0127339 secs]
>
> 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
> 3301238K->84624K(6143488K), 0.0140844 secs]
>
> 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
> 3386000K->89949K(6144000K), 0.0108118 secs]
>
>
>
> kafka a1
>
> 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation Pause)
> (young), 0.0214200 secs]
>
>    [Parallel Time: 17.2 ms, GC Workers: 8]
>
>       [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
> 7982673.8, Diff: 16.3]
>
>       [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff: 1.5,
> Sum: 1.5]
>
>       [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum: 8.4]
>
>          [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13, Sum: 37]
>
>       [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum: 7.1]
>
>       [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> Sum: 0.0]
>
>       [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5, Sum:
> 36.5]
>
>       [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9, Sum: 2.9]
>
>          [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24, Sum:
> 83]
>
>       [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum:
> 0.1]
>
>       [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff: 16.2,
> Sum: 56.5]
>
>       [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max: 7982674.5,
> Diff: 0.6]
>
>    [Code Root Fixup: 0.0 ms]
>
>    [Code Root Purge: 0.0 ms]
>
>    [Clear CT: 1.0 ms]
>
>    [Other: 3.2 ms]
>
>       [Choose CSet: 0.0 ms]
>
>       [Ref Proc: 1.9 ms]
>
>       [Ref Enq: 0.0 ms]
>
>       [Redirty Cards: 0.8 ms]
>
>       [Humongous Register: 0.1 ms]
>
>       [Humongous Reclaim: 0.0 ms]
>
>       [Free CSet: 0.2 ms]
>
>    [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
> 265.5M(1024.0M)->217.9M(1024.0M)]
>
> [Times: user=0.05 sys=0.00, real=0.03 secs]
>
> 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation Pause)
> (young), 0.0310004 secs]
>
>    [Parallel Time: 24.4 ms, GC Workers: 8]
>
>       [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max:
> 7984444.7, Diff: 18.6]
>
>       [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff: 1.9,
> Sum: 2.0]
>
>       [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8, Sum:
> 32.9]
>
>          [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25, Sum: 43]
>
>       [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum: 25.5]
>
>       [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> Sum: 0.0]
>
>       [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9, Sum:
> 32.7]
>
>       [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6, Sum: 6.8]
>
>          [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10, Sum:
> 43]
>
>       [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum:
> 0.1]
>
>       [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff: 19.1,
> Sum: 100.1]
>
>       [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max: 7984449.9,
> Diff: 0.8]
>
>    [Code Root Fixup: 0.0 ms]
>
>    [Code Root Purge: 0.0 ms]
>
>    [Clear CT: 1.1 ms]
>
>    [Other: 5.5 ms]
>
>       [Choose CSet: 0.0 ms]
>
>       [Ref Proc: 2.2 ms]
>
>       [Ref Enq: 0.0 ms]
>
>       [Redirty Cards: 2.8 ms]
>
>       [Humongous Register: 0.1 ms]
>
>       [Humongous Reclaim: 0.0 ms]
>
>       [Free CSet: 0.1 ms]
>
>    [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
> 265.9M(1024.0M)->218.4M(1024.0M)]
>
> [Times: user=0.05 sys=0.00, real=0.03 secs]
>
>
> so when kafka stream running, is there any trying to rebalance? either
> broker rebalance or client rebalance?
> any kind of test to see what cause the trouble?
>
> Thanks,
> Nan
>
>
>
>
>
>
> On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Okay, so you're measuring end-to-end time from producer -> broker ->
> > streams' consumer client, there are multiple phases that can contribute
> to
> > the 100ms latency, and I cannot tell if stream's consumer phase is the
> > major contributor. For example, if the topic was not created before, then
> > when the broker first received a produce request it may need to create
> the
> > topic, which involves multiple steps including writes to ZK which could
> > take time.
> >
> > There are some confusions from your description: you mentioned "Kafka
> > cluster is already up and running", but I think you are referring to
> "Kafka
> > Streams application instances are already up and running", right? Since
> > only the latter has rebalance process, while the Kafak brokers do not
> > really have "rebalances" except balancing load by migrating partitions.
> >
> > Guozhang
> >
> >
> >
> > On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> >
> > > right, so my kafka cluster is already up and running for a while, and I
> > can
> > > see from the log all broker instance already change from rebalance to
> > > running.
> > >
> > > I did a another test.
> > > from producer, right before the message get send to the broker, I put a
> > > timestamp in the message. and from the consumer side which is after
> > stream
> > > processing, I compare this timestamp with current time. I can see some
> > > message processing time is above 100ms on some real powerful hardware.
> > and
> > > from my application gc, all the gc time is below 1ms, kafka gc only
> > happen
> > > once and below 1ms too.
> > >
> > > very puzzled. is there any communication to zookeeper, if not get
> > response,
> > > will cause the broker to pause? I don't think that's the case but at
> this
> > > time don't know what else can be suspected.
> > >
> > > Nan
> > >
> > > On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Nan,
> > > >
> > > > Note that Streams may need some time to rebalance and assign tasks
> even
> > > if
> > > > you only starts with one instance.
> > > >
> > > > I'd suggest you register your state listener in Kafka Streams via
> > > > KafkaStreams#setStateListener, and your customized StateListener
> should
> > > > record when the state transits from REBALANCING to RUNNING since only
> > > after
> > > > that the streams client will start to process the first record.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> > > >
> > > > > thanks, which JMX properties indicate  "processing latency
> spikes"  /
> > > > > "throughput"
> > > > >
> > > > >
> > > > > On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > > > wrote:
> > > > >
> > > > > > I cannot spot any obvious reasons.
> > > > > >
> > > > > > As you consume from the result topic for verification, we should
> > > verify
> > > > > > that the latency spikes original on write and not on read: you
> > might
> > > > > > want to have a look into Kafka Streams JMX metric to see if
> > > processing
> > > > > > latency spikes or throughput drops.
> > > > > >
> > > > > > Also watch for GC pauses in the JVM.
> > > > > >
> > > > > > Hope this helps.
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 8/17/18 12:13 PM, Nan Xu wrote:
> > > > > > > btw, I am using version 0.10.2.0
> > > > > > >
> > > > > > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > >> I am working on a kafka stream app, and see huge latency
> > variance,
> > > > > > >> wondering what can cause this?
> > > > > > >>
> > > > > > >> the processing is very simple and don't have state, linger.ms
> > > > already
> > > > > > >> change to 5ms. the message size is around 10K byes and
> published
> > > as
> > > > > 2000
> > > > > > >> messages/s, network is 10G.  using a regular consumer watch
> the
> > > > > > >> localHistTopic  topic and just every 2000 message print out a
> > > > counter,
> > > > > > it
> > > > > > >> usually every second I get a count 2000 as the publish speed,
> > but
> > > > > > sometime
> > > > > > >> I see it stall for 3 or more seconds and then print out a few
> > > count.
> > > > > > like
> > > > > > >> cpu is paused during that time or message being cache/batch
> then
> > > > > > processed.
> > > > > > >> any suggestion?
> > > > > > >>
> > > > > > >>   final Properties streamsConfiguration = new Properties();
> > > > > > >>
> > > > > > >>
> > > >  streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > > > > >> applicationId);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_
> CONFIG,
> > > > > > clientId);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> > > > > SERVERS_CONFIG,
> > > > > > >> bootstrapServers);
> > > > > > >>
> > > > > > >>
> > > > > > >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> > > > > SERDE_CLASS_CONFIG,
> > > > > > >> Serdes.String()
> > > > > > >>
> > > > > > >>             .getClass().getName());
> > > > > > >>
> > > > > > >>
> > > > > >  streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> MS_CONFIG,
> > > > > > >> 10 * 1000);
> > > > > > >>
> > > > > > >> //
> > > > > > >>
> > > > > >
> > > >
> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > > > 0);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.PRODUCER_PREFIX
> > > > > > >>
> > > > > > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.PRODUCER_PREFIX
> > > > > > >>
> > > > > > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG,
> 335544320);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.PRODUCER_PREFIX
> > > > > > >>
> > > > > > >>             + ProducerConfig.MAX_IN_FLIGHT_
> > > REQUESTS_PER_CONNECTION,
> > > > > 30);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.PRODUCER_PREFIX
> > > > > > >>
> > > > > > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.consumerPrefix(
> > > > > > >>             ConsumerConfig.MAX_PARTITION_
> FETCH_BYTES_CONFIG),20
> > *
> > > > > 1024 *
> > > > > > >> 1024);MS_CONFIG, 10 * 1000);
> > > > > > >>
> > > > > > >> //
> > > > > > >>
> > > > > >
> > > >
> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > > > 0);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.PRODUCER_PREFIX
> > > > > > >>
> > > > > > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.PRODUCER_PREFIX
> > > > > > >>
> > > > > > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG,
> 335544320);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.PRODUCER_PREFIX
> > > > > > >>
> > > > > > >>             + ProducerConfig.MAX_IN_FLIGHT_
> > > REQUESTS_PER_CONNECTION,
> > > > > 30);
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.PRODUCER_PREFIX
> > > > > > >>
> > > > > > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > > > > > >>
> > > > > > >>         streamsConfiguration.put(
> StreamsConfig.consumerPrefix(
> > > > > > >>
> > > > > > >>             ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG
> ,
> > > 20 *
> > > > > > 1024 *
> > > > > > >> 1024);
> > > > > > >>
> > > > > > >>
> > > > > > >>  final StreamsBuilder builder = new StreamsBuilder();
> > > > > > >>
> > > > > > >>  final KStream<String, NodeMutation> localDeltaStream =
> > > > > builder.stream(
> > > > > > >>
> > > > > > >>             localDeltaTopic,
> > > > > > >>
> > > > > > >>             Consumed.with(
> > > > > > >>
> > > > > > >>                 new Serdes.StringSerde(),
> > > > > > >>
> > > > > > >>                 new NodeMutationSerde<>()
> > > > > > >>
> > > > > > >>             )
> > > > > > >>
> > > > > > >>         );
> > > > > > >>
> > > > > > >>   KStream<String, NodeState> localHistStream =
> > > > > > localDeltaStream.mapValues(
> > > > > > >>
> > > > > > >>             (mutation) -> NodeState
> > > > > > >>
> > > > > > >>                 .newBuilder()
> > > > > > >>
> > > > > > >>                 .setMeta(
> > > > > > >>
> > > > > > >>                     mutation.getMetaMutation().getMeta()
> > > > > > >>
> > > > > > >>                 )
> > > > > > >>
> > > > > > >>                 .setValue(
> > > > > > >>
> > > > > > >>                     mutation.getValueMutation().getValue()
> > > > > > >>
> > > > > > >>                 )
> > > > > > >>
> > > > > > >>                 .build()
> > > > > > >>
> > > > > > >>         );
> > > > > > >>
> > > > > > >>   localHistStream.to(
> > > > > > >>
> > > > > > >>             localHistTopic,
> > > > > > >>
> > > > > > >>             Produced.with(new Serdes.StringSerde(), new
> > > > > > NodeStateSerde<>())
> > > > > > >>
> > > > > > >>         );
> > > > > > >>
> > > > > > >>  streams = new KafkaStreams(builder.build(),
> > > streamsConfiguration);
> > > > > > >>
> > > > > > >>         streams.cleanUp();
> > > > > > >>
> > > > > > >> streams.start();
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to