Same in both cases:
client.id=Test-Prototype
application.id=test-prototype

group.id=test-consumer-group
bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
replication.factor=2

auto.offset.reset=earliest


On Friday, September 9, 2016 8:48 AM, Eno Thereska <eno.there...@gmail.com> 
wrote:



Hi Caleb,

Could you share your Kafka Streams configuration (i.e., StreamsConfig
properties you might have set before the test)?

Thanks
Eno


On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org> wrote:

> I have a question with respect to the KafkaStreams API.
>
> I noticed during my prototyping work that my KafkaStreams application was
> not able to keep up with the input on the stream so I dug into it a bit and
> found that it was spending an inordinate amount of time in
> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
> gun itself, so I dropped the implementation down to a single processor
> reading off a source.
>
> public class TestProcessor extends AbstractProcessor<String, String> {
>     static long start = -1;
>     static long count = 0;
>
>     @Override
>     public void process(String key, String value) {
>         if (start < 0) {
>             start = System.currentTimeMillis();
>         }
>         count += 1;
>         if (count > 1000000) {
>             long end = System.currentTimeMillis();
>             double time = (end-start)/1000.0;
>             System.out.printf("Processed %d records in %f seconds (%f
> records/s)\n", count, time, count/time);
>             start = -1;
>             count = 0;
>         }
>
>     }
>
> }
>
> ...
>
>
> TopologyBuilder topologyBuilder = new TopologyBuilder();
> topologyBuilder
>         .addSource("SOURCE", stringDeserializer, StringDeserializer,
> "input")
>         .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>
>
>
> Which I then ran through the KafkaStreams API, and then repeated with
> the KafkaConsumer API.
>
> Using the KafkaConsumer API:
> Processed 1000001 records in 1.790000 seconds (558659.776536 records/s)
> Processed 1000001 records in 1.229000 seconds (813670.463792 records/s)
> Processed 1000001 records in 1.106000 seconds (904160.036166 records/s)
> Processed 1000001 records in 1.190000 seconds (840336.974790 records/s)
>
> Using the KafkaStreams API:
> Processed 1000001 records in 6.407000 seconds (156079.444358 records/s)
> Processed 1000001 records in 5.256000 seconds (190258.942161 records/s)
> Processed 1000001 records in 5.141000 seconds (194514.880373 records/s)
> Processed 1000001 records in 5.111000 seconds (195656.622970 records/s)
>
>
> The profile on the KafkaStreams consisted of:
>
> 89.2% org.apache.kafka.common.network.Selector.select()
>  7.6% org.apache.kafka.clients.producer.internals.
> ProduceRequestResult.await()
>  0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>
>
> Is a 5X performance difference between Kafka Consumer and the
> KafkaStreams api expected?
>
> Are there specific things I can do to diagnose/tune the system?
>
> Thanks,
>   Caleb
>

Reply via email to