Hi Caleb, Could you share your Kafka Streams configuration (i.e., StreamsConfig properties you might have set before the test)?
Thanks Eno On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org> wrote: > I have a question with respect to the KafkaStreams API. > > I noticed during my prototyping work that my KafkaStreams application was > not able to keep up with the input on the stream so I dug into it a bit and > found that it was spending an inordinate amount of time in > org.apache.kafka.common.network.Seloctor.select(). Not exactly a shooting > gun itself, so I dropped the implementation down to a single processor > reading off a source. > > public class TestProcessor extends AbstractProcessor<String, String> { > static long start = -1; > static long count = 0; > > @Override > public void process(String key, String value) { > if (start < 0) { > start = System.currentTimeMillis(); > } > count += 1; > if (count > 1000000) { > long end = System.currentTimeMillis(); > double time = (end-start)/1000.0; > System.out.printf("Processed %d records in %f seconds (%f > records/s)\n", count, time, count/time); > start = -1; > count = 0; > } > > } > > } > > ... > > > TopologyBuilder topologyBuilder = new TopologyBuilder(); > topologyBuilder > .addSource("SOURCE", stringDeserializer, StringDeserializer, > "input") > .addProcessor("PROCESS", TestProcessor::new, "SOURCE"); > > > > Which I then ran through the KafkaStreams API, and then repeated with > the KafkaConsumer API. > > Using the KafkaConsumer API: > Processed 1000001 records in 1.790000 seconds (558659.776536 records/s) > Processed 1000001 records in 1.229000 seconds (813670.463792 records/s) > Processed 1000001 records in 1.106000 seconds (904160.036166 records/s) > Processed 1000001 records in 1.190000 seconds (840336.974790 records/s) > > Using the KafkaStreams API: > Processed 1000001 records in 6.407000 seconds (156079.444358 records/s) > Processed 1000001 records in 5.256000 seconds (190258.942161 records/s) > Processed 1000001 records in 5.141000 seconds (194514.880373 records/s) > Processed 1000001 records in 5.111000 seconds (195656.622970 records/s) > > > The profile on the KafkaStreams consisted of: > > 89.2% org.apache.kafka.common.network.Selector.select() > 7.6% org.apache.kafka.clients.producer.internals. > ProduceRequestResult.await() > 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read() > > > Is a 5X performance difference between Kafka Consumer and the > KafkaStreams api expected? > > Are there specific things I can do to diagnose/tune the system? > > Thanks, > Caleb >