Hi Ara, For parallelism you should have as many Kafka Stream instances as partitions. The instances can run on different servers (each instance can have 1 thread). For a single server on the other hand, you can either start multiple single-threaded instances on that server, or a single instance with multiple threads.
There is some more info at http://docs.confluent.io/3.0.1/streams/architecture.html#parallelism-model <http://docs.confluent.io/3.0.1/streams/architecture.html#parallelism-model> and also a blog post at http://www.confluent.io/blog/elastic-scaling-in-kafka-streams/ <http://www.confluent.io/blog/elastic-scaling-in-kafka-streams/> Thanks Eno > On 10 Sep 2016, at 22:01, Ara Ebrahimi <ara.ebrah...@argyledata.com> wrote: > > Hi Eno, > > Could you elaborate more on tuning Kafka Streaming applications? What are the > relationships between partitions and num.stream.threads num.consumer.fetchers > and other such parameters? On a single node setup with x partitions, what’s > the best way to make sure these partitions are consumed and processed by > kafka streams optimally? > > Ara. > >> On Sep 10, 2016, at 3:18 AM, Eno Thereska <eno.there...@gmail.com> wrote: >> >> Hi Caleb, >> >> We have a benchmark that we run nightly to keep track of performance. The >> numbers we have do indicate that consuming through streams is indeed slower >> than just a pure consumer, however the performance difference is not as >> large as you are observing. Would it be possible for you to run this >> benchmark in your environment and report the numbers? The benchmark is >> included with Kafka Streams and you run it like this: >> >> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh >> org.apache.kafka.streams.perf.SimpleBenchmark >> >> You'll need a Kafka broker to be running. The benchmark will report various >> numbers, but one of them is the performance of the consumer and the >> performance of streams reading. >> >> Thanks >> Eno >> >>> On 9 Sep 2016, at 18:19, Caleb Welton <cewel...@yahoo.com.INVALID> wrote: >>> >>> Same in both cases: >>> client.id=Test-Prototype >>> application.id=test-prototype >>> >>> group.id=test-consumer-group >>> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181 >>> replication.factor=2 >>> >>> auto.offset.reset=earliest >>> >>> >>> On Friday, September 9, 2016 8:48 AM, Eno Thereska <eno.there...@gmail.com> >>> wrote: >>> >>> >>> >>> Hi Caleb, >>> >>> Could you share your Kafka Streams configuration (i.e., StreamsConfig >>> properties you might have set before the test)? >>> >>> Thanks >>> Eno >>> >>> >>> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org> wrote: >>> >>>> I have a question with respect to the KafkaStreams API. >>>> >>>> I noticed during my prototyping work that my KafkaStreams application was >>>> not able to keep up with the input on the stream so I dug into it a bit and >>>> found that it was spending an inordinate amount of time in >>>> org.apache.kafka.common.network.Seloctor.select(). Not exactly a shooting >>>> gun itself, so I dropped the implementation down to a single processor >>>> reading off a source. >>>> >>>> public class TestProcessor extends AbstractProcessor<String, String> { >>>> static long start = -1; >>>> static long count = 0; >>>> >>>> @Override >>>> public void process(String key, String value) { >>>> if (start < 0) { >>>> start = System.currentTimeMillis(); >>>> } >>>> count += 1; >>>> if (count > 1000000) { >>>> long end = System.currentTimeMillis(); >>>> double time = (end-start)/1000.0; >>>> System.out.printf("Processed %d records in %f seconds (%f >>>> records/s)\n", count, time, count/time); >>>> start = -1; >>>> count = 0; >>>> } >>>> >>>> } >>>> >>>> } >>>> >>>> ... >>>> >>>> >>>> TopologyBuilder topologyBuilder = new TopologyBuilder(); >>>> topologyBuilder >>>> .addSource("SOURCE", stringDeserializer, StringDeserializer, >>>> "input") >>>> .addProcessor("PROCESS", TestProcessor::new, "SOURCE"); >>>> >>>> >>>> >>>> Which I then ran through the KafkaStreams API, and then repeated with >>>> the KafkaConsumer API. >>>> >>>> Using the KafkaConsumer API: >>>> Processed 1000001 records in 1.790000 seconds (558659.776536 records/s) >>>> Processed 1000001 records in 1.229000 seconds (813670.463792 records/s) >>>> Processed 1000001 records in 1.106000 seconds (904160.036166 records/s) >>>> Processed 1000001 records in 1.190000 seconds (840336.974790 records/s) >>>> >>>> Using the KafkaStreams API: >>>> Processed 1000001 records in 6.407000 seconds (156079.444358 records/s) >>>> Processed 1000001 records in 5.256000 seconds (190258.942161 records/s) >>>> Processed 1000001 records in 5.141000 seconds (194514.880373 records/s) >>>> Processed 1000001 records in 5.111000 seconds (195656.622970 records/s) >>>> >>>> >>>> The profile on the KafkaStreams consisted of: >>>> >>>> 89.2% org.apache.kafka.common.network.Selector.select() >>>> 7.6% org.apache.kafka.clients.producer.internals. >>>> ProduceRequestResult.await() >>>> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read() >>>> >>>> >>>> Is a 5X performance difference between Kafka Consumer and the >>>> KafkaStreams api expected? >>>> >>>> Are there specific things I can do to diagnose/tune the system? >>>> >>>> Thanks, >>>> Caleb >>>> >> >> >> >> >> ________________________________ >> >> This message is for the designated recipient only and may contain >> privileged, proprietary, or otherwise confidential information. If you have >> received it in error, please notify the sender immediately and delete the >> original. Any other use of the e-mail by you is prohibited. Thank you in >> advance for your cooperation. >> >> ________________________________ > > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________