Hello Nan,

Note that Streams may need some time to rebalance and assign tasks even if
you only starts with one instance.

I'd suggest you register your state listener in Kafka Streams via
KafkaStreams#setStateListener, and your customized StateListener should
record when the state transits from REBALANCING to RUNNING since only after
that the streams client will start to process the first record.


Guozhang


On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com> wrote:

> thanks, which JMX properties indicate  "processing latency spikes"  /
> "throughput"
>
>
> On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > I cannot spot any obvious reasons.
> >
> > As you consume from the result topic for verification, we should verify
> > that the latency spikes original on write and not on read: you might
> > want to have a look into Kafka Streams JMX metric to see if processing
> > latency spikes or throughput drops.
> >
> > Also watch for GC pauses in the JVM.
> >
> > Hope this helps.
> >
> >
> > -Matthias
> >
> > On 8/17/18 12:13 PM, Nan Xu wrote:
> > > btw, I am using version 0.10.2.0
> > >
> > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <nanxu1...@gmail.com> wrote:
> > >
> > >> I am working on a kafka stream app, and see huge latency variance,
> > >> wondering what can cause this?
> > >>
> > >> the processing is very simple and don't have state, linger.ms already
> > >> change to 5ms. the message size is around 10K byes and published as
> 2000
> > >> messages/s, network is 10G.  using a regular consumer watch the
> > >> localHistTopic  topic and just every 2000 message print out a counter,
> > it
> > >> usually every second I get a count 2000 as the publish speed, but
> > sometime
> > >> I see it stall for 3 or more seconds and then print out a few count.
> > like
> > >> cpu is paused during that time or message being cache/batch then
> > processed.
> > >> any suggestion?
> > >>
> > >>   final Properties streamsConfiguration = new Properties();
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > >> applicationId);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
> > clientId);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> SERVERS_CONFIG,
> > >> bootstrapServers);
> > >>
> > >>
> > >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> SERDE_CLASS_CONFIG,
> > >> Serdes.String()
> > >>
> > >>             .getClass().getName());
> > >>
> > >>
> >  streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > >> 10 * 1000);
> > >>
> > >> //
> > >>
> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >>
> > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >>
> > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >>
> > >>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> 30);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >>
> > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
> > >>             ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 *
> 1024 *
> > >> 1024);MS_CONFIG, 10 * 1000);
> > >>
> > >> //
> > >>
> > streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >>
> > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >>
> > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >>
> > >>             + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> 30);
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX
> > >>
> > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > >>
> > >>         streamsConfiguration.put(StreamsConfig.consumerPrefix(
> > >>
> > >>             ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 *
> > 1024 *
> > >> 1024);
> > >>
> > >>
> > >>  final StreamsBuilder builder = new StreamsBuilder();
> > >>
> > >>  final KStream<String, NodeMutation> localDeltaStream =
> builder.stream(
> > >>
> > >>             localDeltaTopic,
> > >>
> > >>             Consumed.with(
> > >>
> > >>                 new Serdes.StringSerde(),
> > >>
> > >>                 new NodeMutationSerde<>()
> > >>
> > >>             )
> > >>
> > >>         );
> > >>
> > >>   KStream<String, NodeState> localHistStream =
> > localDeltaStream.mapValues(
> > >>
> > >>             (mutation) -> NodeState
> > >>
> > >>                 .newBuilder()
> > >>
> > >>                 .setMeta(
> > >>
> > >>                     mutation.getMetaMutation().getMeta()
> > >>
> > >>                 )
> > >>
> > >>                 .setValue(
> > >>
> > >>                     mutation.getValueMutation().getValue()
> > >>
> > >>                 )
> > >>
> > >>                 .build()
> > >>
> > >>         );
> > >>
> > >>   localHistStream.to(
> > >>
> > >>             localHistTopic,
> > >>
> > >>             Produced.with(new Serdes.StringSerde(), new
> > NodeStateSerde<>())
> > >>
> > >>         );
> > >>
> > >>  streams = new KafkaStreams(builder.build(), streamsConfiguration);
> > >>
> > >>         streams.cleanUp();
> > >>
> > >> streams.start();
> > >>
> > >>
> > >>
> > >>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to