Hi Ara,

For parallelism you should have as many Kafka Stream instances as partitions. 
The instances can run on different servers (each instance can have 1 thread). 
For a single server on the other hand, you can either start multiple 
single-threaded instances on that server, or a single instance with multiple 
threads.

There is some more info at 
http://docs.confluent.io/3.0.1/streams/architecture.html#parallelism-model 
<http://docs.confluent.io/3.0.1/streams/architecture.html#parallelism-model> 
and also a blog post at 
http://www.confluent.io/blog/elastic-scaling-in-kafka-streams/ 
<http://www.confluent.io/blog/elastic-scaling-in-kafka-streams/>

Thanks
Eno


> On 10 Sep 2016, at 22:01, Ara Ebrahimi <ara.ebrah...@argyledata.com> wrote:
> 
> Hi Eno,
> 
> Could you elaborate more on tuning Kafka Streaming applications? What are the 
> relationships between partitions and num.stream.threads num.consumer.fetchers 
> and other such parameters? On a single node setup with x partitions, what’s 
> the best way to make sure these partitions are consumed and processed by 
> kafka streams optimally?
> 
> Ara.
> 
>> On Sep 10, 2016, at 3:18 AM, Eno Thereska <eno.there...@gmail.com> wrote:
>> 
>> Hi Caleb,
>> 
>> We have a benchmark that we run nightly to keep track of performance. The 
>> numbers we have do indicate that consuming through streams is indeed slower 
>> than just a pure consumer, however the performance difference is not as 
>> large as you are observing. Would it be possible for you to run this 
>> benchmark in your environment and report the numbers? The benchmark is 
>> included with Kafka Streams and you run it like this:
>> 
>> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh 
>> org.apache.kafka.streams.perf.SimpleBenchmark
>> 
>> You'll need a Kafka broker to be running. The benchmark will report various 
>> numbers, but one of them is the performance of the consumer and the 
>> performance of streams reading.
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Sep 2016, at 18:19, Caleb Welton <cewel...@yahoo.com.INVALID> wrote:
>>> 
>>> Same in both cases:
>>> client.id=Test-Prototype
>>> application.id=test-prototype
>>> 
>>> group.id=test-consumer-group
>>> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
>>> replication.factor=2
>>> 
>>> auto.offset.reset=earliest
>>> 
>>> 
>>> On Friday, September 9, 2016 8:48 AM, Eno Thereska <eno.there...@gmail.com> 
>>> wrote:
>>> 
>>> 
>>> 
>>> Hi Caleb,
>>> 
>>> Could you share your Kafka Streams configuration (i.e., StreamsConfig
>>> properties you might have set before the test)?
>>> 
>>> Thanks
>>> Eno
>>> 
>>> 
>>> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org> wrote:
>>> 
>>>> I have a question with respect to the KafkaStreams API.
>>>> 
>>>> I noticed during my prototyping work that my KafkaStreams application was
>>>> not able to keep up with the input on the stream so I dug into it a bit and
>>>> found that it was spending an inordinate amount of time in
>>>> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
>>>> gun itself, so I dropped the implementation down to a single processor
>>>> reading off a source.
>>>> 
>>>> public class TestProcessor extends AbstractProcessor<String, String> {
>>>>  static long start = -1;
>>>>  static long count = 0;
>>>> 
>>>>  @Override
>>>>  public void process(String key, String value) {
>>>>      if (start < 0) {
>>>>          start = System.currentTimeMillis();
>>>>      }
>>>>      count += 1;
>>>>      if (count > 1000000) {
>>>>          long end = System.currentTimeMillis();
>>>>          double time = (end-start)/1000.0;
>>>>          System.out.printf("Processed %d records in %f seconds (%f
>>>> records/s)\n", count, time, count/time);
>>>>          start = -1;
>>>>          count = 0;
>>>>      }
>>>> 
>>>>  }
>>>> 
>>>> }
>>>> 
>>>> ...
>>>> 
>>>> 
>>>> TopologyBuilder topologyBuilder = new TopologyBuilder();
>>>> topologyBuilder
>>>>      .addSource("SOURCE", stringDeserializer, StringDeserializer,
>>>> "input")
>>>>      .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>>>> 
>>>> 
>>>> 
>>>> Which I then ran through the KafkaStreams API, and then repeated with
>>>> the KafkaConsumer API.
>>>> 
>>>> Using the KafkaConsumer API:
>>>> Processed 1000001 records in 1.790000 seconds (558659.776536 records/s)
>>>> Processed 1000001 records in 1.229000 seconds (813670.463792 records/s)
>>>> Processed 1000001 records in 1.106000 seconds (904160.036166 records/s)
>>>> Processed 1000001 records in 1.190000 seconds (840336.974790 records/s)
>>>> 
>>>> Using the KafkaStreams API:
>>>> Processed 1000001 records in 6.407000 seconds (156079.444358 records/s)
>>>> Processed 1000001 records in 5.256000 seconds (190258.942161 records/s)
>>>> Processed 1000001 records in 5.141000 seconds (194514.880373 records/s)
>>>> Processed 1000001 records in 5.111000 seconds (195656.622970 records/s)
>>>> 
>>>> 
>>>> The profile on the KafkaStreams consisted of:
>>>> 
>>>> 89.2% org.apache.kafka.common.network.Selector.select()
>>>> 7.6% org.apache.kafka.clients.producer.internals.
>>>> ProduceRequestResult.await()
>>>> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>>>> 
>>>> 
>>>> Is a 5X performance difference between Kafka Consumer and the
>>>> KafkaStreams api expected?
>>>> 
>>>> Are there specific things I can do to diagnose/tune the system?
>>>> 
>>>> Thanks,
>>>> Caleb
>>>> 
>> 
>> 
>> 
>> 
>> ________________________________
>> 
>> This message is for the designated recipient only and may contain 
>> privileged, proprietary, or otherwise confidential information. If you have 
>> received it in error, please notify the sender immediately and delete the 
>> original. Any other use of the e-mail by you is prohibited. Thank you in 
>> advance for your cooperation.
>> 
>> ________________________________
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
> 
> ________________________________

Reply via email to