Given your application code:

----------------------------

 final KStream<String, NodeMutation> localDeltaStream = builder.stream(

            localDeltaTopic,

            Consumed.with(

                new Serdes.StringSerde(),

                new NodeMutationSerde<>()

            )

        );

  KStream<String, NodeState> localHistStream = localDeltaStream.mapValues(

            (mutation) -> NodeState

                .newBuilder()

                .setMeta(

                    mutation.getMetaMutation().getMeta()

                )

                .setValue(

                    mutation.getValueMutation().getValue()

                )

                .build()

        );

  localHistStream.to(

            localHistTopic,

            Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>())

        );

----------------------------

which is pure stateless, committing will not touch on an state directory at
all. Hence committing only involves committing offsets to Kafka.


Guozhang


On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote:

> I was suspecting that too, but I also noticed the spike is not spaced
> around 10s. to further prove it. I put kafka data directory in a memory
> based directory.  it still has such latency spikes.  I am going to test it
> on a single broker, single partition env.  will report back soon.
>
> On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Nan,
> >
> > Thanks for the detailed information you shared. When Kafka Streams is
> > normally running, no rebalances should be triggered unless some of the
> > instances (in your case, docker containers) have soft failures.
> >
> > I suspect the latency spike is due to the commit intervals: streams will
> > try to commit its offset at a regular paces, which may increase latency.
> It
> > is controlled by the "commit.interval.ms" config value. I saw that in
> your
> > original email you've set it to 10 * 1000 (10 seconds). Is that aligned
> > with the frequency you observe latency spikes?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> >
> > > did more test and and make the test case simple.
> > > all the setup now is a single physical machine. running 3 docker
> > instance.
> > > a1, a2, a3
> > >
> > > kafka + zookeeper running on all of those docker containers.
> > > producer running on a1, send a single key,  update speed 2000
> message/s,
> > > each message is 10K size.
> > > 3 consumer(different group)  are running. one on each docker.
> > > all topics are pre-created.
> > > in startup, I do see some latency greater than 100ms, which is fine.
> and
> > > then everything is good. latency is low and consumer don't see anything
> > > over 100ms for a while.
> > > then I see a few messages have latency over 100ms. then back to normal,
> > > then happen again..... do seems like gc problem. but I check the gc
> > log.  I
> > > don't think it can cause over 100ms. (both are G1 collector)
> > >
> > > after the stream stable running( exclude the startup), the first
> message
> > > over 100ms take 179ms  and the gc ( it has a 30ms pause, but should not
> > > cause a 179ms end to end).
> > >
> > > FROM APP
> > >
> > > 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
> > > 3184739K->84018K(5947904K), 0.0093730 secs]
> > >
> > > 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
> > > 3184690K->84280K(6053888K), 0.0087473 secs]
> > >
> > > 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
> > > 3301176K->84342K(6061056K), 0.0127339 secs]
> > >
> > > 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
> > > 3301238K->84624K(6143488K), 0.0140844 secs]
> > >
> > > 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
> > > 3386000K->89949K(6144000K), 0.0108118 secs]
> > >
> > >
> > >
> > > kafka a1
> > >
> > > 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation Pause)
> > > (young), 0.0214200 secs]
> > >
> > >    [Parallel Time: 17.2 ms, GC Workers: 8]
> > >
> > >       [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
> > > 7982673.8, Diff: 16.3]
> > >
> > >       [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff: 1.5,
> > > Sum: 1.5]
> > >
> > >       [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum:
> 8.4]
> > >
> > >          [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13, Sum:
> > 37]
> > >
> > >       [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum: 7.1]
> > >
> > >       [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> 0.0,
> > > Sum: 0.0]
> > >
> > >       [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5, Sum:
> > > 36.5]
> > >
> > >       [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9, Sum:
> > 2.9]
> > >
> > >          [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24,
> > Sum:
> > > 83]
> > >
> > >       [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> > Sum:
> > > 0.1]
> > >
> > >       [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff: 16.2,
> > > Sum: 56.5]
> > >
> > >       [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max:
> > 7982674.5,
> > > Diff: 0.6]
> > >
> > >    [Code Root Fixup: 0.0 ms]
> > >
> > >    [Code Root Purge: 0.0 ms]
> > >
> > >    [Clear CT: 1.0 ms]
> > >
> > >    [Other: 3.2 ms]
> > >
> > >       [Choose CSet: 0.0 ms]
> > >
> > >       [Ref Proc: 1.9 ms]
> > >
> > >       [Ref Enq: 0.0 ms]
> > >
> > >       [Redirty Cards: 0.8 ms]
> > >
> > >       [Humongous Register: 0.1 ms]
> > >
> > >       [Humongous Reclaim: 0.0 ms]
> > >
> > >       [Free CSet: 0.2 ms]
> > >
> > >    [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
> > > 265.5M(1024.0M)->217.9M(1024.0M)]
> > >
> > > [Times: user=0.05 sys=0.00, real=0.03 secs]
> > >
> > > 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation Pause)
> > > (young), 0.0310004 secs]
> > >
> > >    [Parallel Time: 24.4 ms, GC Workers: 8]
> > >
> > >       [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max:
> > > 7984444.7, Diff: 18.6]
> > >
> > >       [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff: 1.9,
> > > Sum: 2.0]
> > >
> > >       [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8, Sum:
> > > 32.9]
> > >
> > >          [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25, Sum:
> > 43]
> > >
> > >       [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum:
> > 25.5]
> > >
> > >       [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> 0.0,
> > > Sum: 0.0]
> > >
> > >       [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9, Sum:
> > > 32.7]
> > >
> > >       [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6, Sum:
> > 6.8]
> > >
> > >          [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10,
> Sum:
> > > 43]
> > >
> > >       [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> > Sum:
> > > 0.1]
> > >
> > >       [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff:
> 19.1,
> > > Sum: 100.1]
> > >
> > >       [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max:
> > 7984449.9,
> > > Diff: 0.8]
> > >
> > >    [Code Root Fixup: 0.0 ms]
> > >
> > >    [Code Root Purge: 0.0 ms]
> > >
> > >    [Clear CT: 1.1 ms]
> > >
> > >    [Other: 5.5 ms]
> > >
> > >       [Choose CSet: 0.0 ms]
> > >
> > >       [Ref Proc: 2.2 ms]
> > >
> > >       [Ref Enq: 0.0 ms]
> > >
> > >       [Redirty Cards: 2.8 ms]
> > >
> > >       [Humongous Register: 0.1 ms]
> > >
> > >       [Humongous Reclaim: 0.0 ms]
> > >
> > >       [Free CSet: 0.1 ms]
> > >
> > >    [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
> > > 265.9M(1024.0M)->218.4M(1024.0M)]
> > >
> > > [Times: user=0.05 sys=0.00, real=0.03 secs]
> > >
> > >
> > > so when kafka stream running, is there any trying to rebalance? either
> > > broker rebalance or client rebalance?
> > > any kind of test to see what cause the trouble?
> > >
> > > Thanks,
> > > Nan
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Okay, so you're measuring end-to-end time from producer -> broker ->
> > > > streams' consumer client, there are multiple phases that can
> contribute
> > > to
> > > > the 100ms latency, and I cannot tell if stream's consumer phase is
> the
> > > > major contributor. For example, if the topic was not created before,
> > then
> > > > when the broker first received a produce request it may need to
> create
> > > the
> > > > topic, which involves multiple steps including writes to ZK which
> could
> > > > take time.
> > > >
> > > > There are some confusions from your description: you mentioned "Kafka
> > > > cluster is already up and running", but I think you are referring to
> > > "Kafka
> > > > Streams application instances are already up and running", right?
> Since
> > > > only the latter has rebalance process, while the Kafak brokers do not
> > > > really have "rebalances" except balancing load by migrating
> partitions.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > > On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> > > >
> > > > > right, so my kafka cluster is already up and running for a while,
> > and I
> > > > can
> > > > > see from the log all broker instance already change from rebalance
> to
> > > > > running.
> > > > >
> > > > > I did a another test.
> > > > > from producer, right before the message get send to the broker, I
> > put a
> > > > > timestamp in the message. and from the consumer side which is after
> > > > stream
> > > > > processing, I compare this timestamp with current time. I can see
> > some
> > > > > message processing time is above 100ms on some real powerful
> > hardware.
> > > > and
> > > > > from my application gc, all the gc time is below 1ms, kafka gc only
> > > > happen
> > > > > once and below 1ms too.
> > > > >
> > > > > very puzzled. is there any communication to zookeeper, if not get
> > > > response,
> > > > > will cause the broker to pause? I don't think that's the case but
> at
> > > this
> > > > > time don't know what else can be suspected.
> > > > >
> > > > > Nan
> > > > >
> > > > > On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Nan,
> > > > > >
> > > > > > Note that Streams may need some time to rebalance and assign
> tasks
> > > even
> > > > > if
> > > > > > you only starts with one instance.
> > > > > >
> > > > > > I'd suggest you register your state listener in Kafka Streams via
> > > > > > KafkaStreams#setStateListener, and your customized StateListener
> > > should
> > > > > > record when the state transits from REBALANCING to RUNNING since
> > only
> > > > > after
> > > > > > that the streams client will start to process the first record.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > thanks, which JMX properties indicate  "processing latency
> > > spikes"  /
> > > > > > > "throughput"
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
> > > > matth...@confluent.io
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > I cannot spot any obvious reasons.
> > > > > > > >
> > > > > > > > As you consume from the result topic for verification, we
> > should
> > > > > verify
> > > > > > > > that the latency spikes original on write and not on read:
> you
> > > > might
> > > > > > > > want to have a look into Kafka Streams JMX metric to see if
> > > > > processing
> > > > > > > > latency spikes or throughput drops.
> > > > > > > >
> > > > > > > > Also watch for GC pauses in the JVM.
> > > > > > > >
> > > > > > > > Hope this helps.
> > > > > > > >
> > > > > > > >
> > > > > > > > -Matthias
> > > > > > > >
> > > > > > > > On 8/17/18 12:13 PM, Nan Xu wrote:
> > > > > > > > > btw, I am using version 0.10.2.0
> > > > > > > > >
> > > > > > > > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <
> nanxu1...@gmail.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > >> I am working on a kafka stream app, and see huge latency
> > > > variance,
> > > > > > > > >> wondering what can cause this?
> > > > > > > > >>
> > > > > > > > >> the processing is very simple and don't have state,
> > linger.ms
> > > > > > already
> > > > > > > > >> change to 5ms. the message size is around 10K byes and
> > > published
> > > > > as
> > > > > > > 2000
> > > > > > > > >> messages/s, network is 10G.  using a regular consumer
> watch
> > > the
> > > > > > > > >> localHistTopic  topic and just every 2000 message print
> out
> > a
> > > > > > counter,
> > > > > > > > it
> > > > > > > > >> usually every second I get a count 2000 as the publish
> > speed,
> > > > but
> > > > > > > > sometime
> > > > > > > > >> I see it stall for 3 or more seconds and then print out a
> > few
> > > > > count.
> > > > > > > > like
> > > > > > > > >> cpu is paused during that time or message being
> cache/batch
> > > then
> > > > > > > > processed.
> > > > > > > > >> any suggestion?
> > > > > > > > >>
> > > > > > > > >>   final Properties streamsConfiguration = new
> Properties();
> > > > > > > > >>
> > > > > > > > >>
> > > > > >  streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > > > > > > >> applicationId);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_
> > > CONFIG,
> > > > > > > > clientId);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> > > > > > > SERVERS_CONFIG,
> > > > > > > > >> bootstrapServers);
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> > > > > > > SERDE_CLASS_CONFIG,
> > > > > > > > >> Serdes.String()
> > > > > > > > >>
> > > > > > > > >>             .getClass().getName());
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > >  streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> > > MS_CONFIG,
> > > > > > > > >> 10 * 1000);
> > > > > > > > >>
> > > > > > > > >> //
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > > > > > 0);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > >>
> > > > > > > > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > >>
> > > > > > > > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG,
> > > 335544320);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > >>
> > > > > > > > >>             + ProducerConfig.MAX_IN_FLIGHT_
> > > > > REQUESTS_PER_CONNECTION,
> > > > > > > 30);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > >>
> > > > > > > > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.consumerPrefix(
> > > > > > > > >>             ConsumerConfig.MAX_PARTITION_
> > > FETCH_BYTES_CONFIG),20
> > > > *
> > > > > > > 1024 *
> > > > > > > > >> 1024);MS_CONFIG, 10 * 1000);
> > > > > > > > >>
> > > > > > > > >> //
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > > > > > 0);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > >>
> > > > > > > > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > >>
> > > > > > > > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG,
> > > 335544320);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > >>
> > > > > > > > >>             + ProducerConfig.MAX_IN_FLIGHT_
> > > > > REQUESTS_PER_CONNECTION,
> > > > > > > 30);
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > >>
> > > > > > > > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > > > > > > > >>
> > > > > > > > >>         streamsConfiguration.put(
> > > StreamsConfig.consumerPrefix(
> > > > > > > > >>
> > > > > > > > >>             ConsumerConfig. MAX_PARTITION_FETCH_BYTES_
> CONFIG
> > > ,
> > > > > 20 *
> > > > > > > > 1024 *
> > > > > > > > >> 1024);
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>  final StreamsBuilder builder = new StreamsBuilder();
> > > > > > > > >>
> > > > > > > > >>  final KStream<String, NodeMutation> localDeltaStream =
> > > > > > > builder.stream(
> > > > > > > > >>
> > > > > > > > >>             localDeltaTopic,
> > > > > > > > >>
> > > > > > > > >>             Consumed.with(
> > > > > > > > >>
> > > > > > > > >>                 new Serdes.StringSerde(),
> > > > > > > > >>
> > > > > > > > >>                 new NodeMutationSerde<>()
> > > > > > > > >>
> > > > > > > > >>             )
> > > > > > > > >>
> > > > > > > > >>         );
> > > > > > > > >>
> > > > > > > > >>   KStream<String, NodeState> localHistStream =
> > > > > > > > localDeltaStream.mapValues(
> > > > > > > > >>
> > > > > > > > >>             (mutation) -> NodeState
> > > > > > > > >>
> > > > > > > > >>                 .newBuilder()
> > > > > > > > >>
> > > > > > > > >>                 .setMeta(
> > > > > > > > >>
> > > > > > > > >>                     mutation.getMetaMutation().getMeta()
> > > > > > > > >>
> > > > > > > > >>                 )
> > > > > > > > >>
> > > > > > > > >>                 .setValue(
> > > > > > > > >>
> > > > > > > > >>                     mutation.getValueMutation().
> getValue()
> > > > > > > > >>
> > > > > > > > >>                 )
> > > > > > > > >>
> > > > > > > > >>                 .build()
> > > > > > > > >>
> > > > > > > > >>         );
> > > > > > > > >>
> > > > > > > > >>   localHistStream.to(
> > > > > > > > >>
> > > > > > > > >>             localHistTopic,
> > > > > > > > >>
> > > > > > > > >>             Produced.with(new Serdes.StringSerde(), new
> > > > > > > > NodeStateSerde<>())
> > > > > > > > >>
> > > > > > > > >>         );
> > > > > > > > >>
> > > > > > > > >>  streams = new KafkaStreams(builder.build(),
> > > > > streamsConfiguration);
> > > > > > > > >>
> > > > > > > > >>         streams.cleanUp();
> > > > > > > > >>
> > > > > > > > >> streams.start();
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to