I am working on a kafka stream app, and see huge latency variance,
wondering what can cause this?

the processing is very simple and don't have state, linger.ms already
change to 5ms. the message size is around 10K byes and published as 2000
messages/s, network is 10G.  using a regular consumer watch the
localHistTopic  topic and just every 2000 message print out a counter,  it
usually every second I get a count 2000 as the publish speed, but sometime
I see it stall for 3 or more seconds and then print out a few count. like
cpu is paused during that time or message being cache/batch then processed.
any suggestion?

  final Properties streamsConfiguration = new Properties();

        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationId);

        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);

        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);


streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String()

            .getClass().getName());

        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
10 * 1000);

//
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX

            + ProducerConfig.BATCH_SIZE_CONFIG,163840);

        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX

            + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);

        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX

            + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);

        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX

            + ProducerConfig.LINGER_MS_CONFIG,"5");

        streamsConfiguration.put(StreamsConfig.consumerPrefix(
            ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),20 * 1024 *
1024);MS_CONFIG, 10 * 1000);

//
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX

            + ProducerConfig.BATCH_SIZE_CONFIG,163840);

        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX

            + ProducerConfig.BUFFER_MEMORY_CONFIG, 335544320);

        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX

            + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 30);

        streamsConfiguration.put(StreamsConfig.PRODUCER_PREFIX

            + ProducerConfig.LINGER_MS_CONFIG,"5");

        streamsConfiguration.put(StreamsConfig.consumerPrefix(

            ConsumerConfig. MAX_PARTITION_FETCH_BYTES_CONFIG , 20 * 1024 *
1024);


 final StreamsBuilder builder = new StreamsBuilder();

 final KStream<String, NodeMutation> localDeltaStream = builder.stream(

            localDeltaTopic,

            Consumed.with(

                new Serdes.StringSerde(),

                new NodeMutationSerde<>()

            )

        );

  KStream<String, NodeState> localHistStream = localDeltaStream.mapValues(

            (mutation) -> NodeState

                .newBuilder()

                .setMeta(

                    mutation.getMetaMutation().getMeta()

                )

                .setValue(

                    mutation.getValueMutation().getValue()

                )

                .build()

        );

  localHistStream.to(

            localHistTopic,

            Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>())

        );

 streams = new KafkaStreams(builder.build(), streamsConfiguration);

        streams.cleanUp();

streams.start();

Reply via email to