Re: Dropped messages in kstreams?
Short answer seems to be that my Kafka LogRetentionTime was such that the metrics I was writing were getting purged from kafka during the test. Dropped metrics. On Thu, Jun 15, 2017 at 1:32 PM, Caleb Welton <ca...@autonomic.ai> wrote: > I have encapsulated the repro into a small self contained project: > https://github.com/cwelton/kstreams-repro > > Thanks, > Caleb > > > On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote: > >> I do have a TimestampExtractor setup and for the 10 second windows that >> are emitted all the values expected in those windows are present, e.g. each >> 10 second window gets 100 values aggregated into it. >> >> I have no metrics with null keys or values. >> >> I will try to get the entire reproduction case packaged up in a way that >> I can more easily share. >> >> >> On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Another thing to consider? Do you have records will null key or value? >>> Those records would be dropped and not processes. >>> >>> -Matthias >>> >>> On 6/15/17 6:24 AM, Garrett Barton wrote: >>> > Is your time usage correct? It sounds like you want event time not >>> > load/process time which is default unless you have a TimestampExtractor >>> > defined somewhere upstream? Otherwise I could see far fewer events >>> coming >>> > out as streams is just aggregating whatever showed up in that 10 second >>> > window. >>> > >>> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> >>> wrote: >>> > >>> >> Disabling the cache with: >>> >> >>> >> ``` >>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE >>> RING_CONFIG, >>> >> 0) >>> >> ``` >>> >> >>> >> Results in: >>> >> - Emitting many more intermediate calculations. >>> >> - Still losing data. >>> >> >>> >> In my test case it output 342476 intermediate calculations for 3414 >>> >> distinct windows, 14400 distinct were expected. >>> >> >>> >> Regards, >>> >> Caleb >>> >> >>> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax < >>> matth...@confluent.io> >>> >> wrote: >>> >> >>> >>> This seems to be related to internal KTable caches. You can disable >>> them >>> >>> by setting cache size to zero. >>> >>> >>> >>> http://docs.confluent.io/current/streams/developer- >>> >>> guide.html#memory-management >>> >>> >>> >>> -Matthias >>> >>> >>> >>> >>> >>> >>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote: >>> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then >>> the >>> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_ >>> >>> THREADS_CONFIG=2` >>> >>>> or higher the problem shows up. >>> >>>> >>> >>>> When the number of threads is 1 the speed of data through the first >>> >> part >>> >>> of >>> >>>> the topology (before the ktable) slows down considerably, but it >>> seems >>> >> to >>> >>>> slow down to the speed of the output which may be the key. >>> >>>> >>> >>>> That said... Changing the number of stream threads should not impact >>> >> data >>> >>>> correctness. Seems like a bug someplace in kafka. >>> >>>> >>> >>>> >>> >>>> >>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> >>> >>> wrote: >>> >>>> >>> >>>>> I have a topology of >>> >>>>> KStream -> KTable -> KStream >>> >>>>> >>> >>>>> ``` >>> >>>>> >>> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >>> >>>>> final KStream<String, Metric> metricStream = >>> >>> builder.stream(ingestTopic); >>> >>>>> final KTable<Windowed, MyThing> myT
Re: Dropped messages in kstreams?
I have encapsulated the repro into a small self contained project: https://github.com/cwelton/kstreams-repro Thanks, Caleb On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote: > I do have a TimestampExtractor setup and for the 10 second windows that > are emitted all the values expected in those windows are present, e.g. each > 10 second window gets 100 values aggregated into it. > > I have no metrics with null keys or values. > > I will try to get the entire reproduction case packaged up in a way that I > can more easily share. > > > On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Another thing to consider? Do you have records will null key or value? >> Those records would be dropped and not processes. >> >> -Matthias >> >> On 6/15/17 6:24 AM, Garrett Barton wrote: >> > Is your time usage correct? It sounds like you want event time not >> > load/process time which is default unless you have a TimestampExtractor >> > defined somewhere upstream? Otherwise I could see far fewer events >> coming >> > out as streams is just aggregating whatever showed up in that 10 second >> > window. >> > >> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> >> wrote: >> > >> >> Disabling the cache with: >> >> >> >> ``` >> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE >> RING_CONFIG, >> >> 0) >> >> ``` >> >> >> >> Results in: >> >> - Emitting many more intermediate calculations. >> >> - Still losing data. >> >> >> >> In my test case it output 342476 intermediate calculations for 3414 >> >> distinct windows, 14400 distinct were expected. >> >> >> >> Regards, >> >> Caleb >> >> >> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax < >> matth...@confluent.io> >> >> wrote: >> >> >> >>> This seems to be related to internal KTable caches. You can disable >> them >> >>> by setting cache size to zero. >> >>> >> >>> http://docs.confluent.io/current/streams/developer- >> >>> guide.html#memory-management >> >>> >> >>> -Matthias >> >>> >> >>> >> >>> >> >>> On 6/14/17 4:08 PM, Caleb Welton wrote: >> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then >> the >> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_ >> >>> THREADS_CONFIG=2` >> >>>> or higher the problem shows up. >> >>>> >> >>>> When the number of threads is 1 the speed of data through the first >> >> part >> >>> of >> >>>> the topology (before the ktable) slows down considerably, but it >> seems >> >> to >> >>>> slow down to the speed of the output which may be the key. >> >>>> >> >>>> That said... Changing the number of stream threads should not impact >> >> data >> >>>> correctness. Seems like a bug someplace in kafka. >> >>>> >> >>>> >> >>>> >> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> >> >>> wrote: >> >>>> >> >>>>> I have a topology of >> >>>>> KStream -> KTable -> KStream >> >>>>> >> >>>>> ``` >> >>>>> >> >>>>> final KStreamBuilder builder = new KStreamBuilder(); >> >>>>> final KStream<String, Metric> metricStream = >> >>> builder.stream(ingestTopic); >> >>>>> final KTable<Windowed, MyThing> myTable = metricStream >> >>>>> .groupByKey(stringSerde, mySerde) >> >>>>> .reduce(MyThing::merge, >> >>>>> TimeWindows.of(1).advanceBy(1).until( >> >>> Duration.ofDays(retentionDays).toMillis()), >> >>>>> tableTopic); >> >>>>> >> >>>>> myTable.toStream() >> >>>>> .map((key, value) -> { return (KeyValue.pair(key.key(), >> >>> value.finalize(key.window(; }) >> >>>>> .to(stringSerde, my
Re: Dropped messages in kstreams?
I do have a TimestampExtractor setup and for the 10 second windows that are emitted all the values expected in those windows are present, e.g. each 10 second window gets 100 values aggregated into it. I have no metrics with null keys or values. I will try to get the entire reproduction case packaged up in a way that I can more easily share. On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Another thing to consider? Do you have records will null key or value? > Those records would be dropped and not processes. > > -Matthias > > On 6/15/17 6:24 AM, Garrett Barton wrote: > > Is your time usage correct? It sounds like you want event time not > > load/process time which is default unless you have a TimestampExtractor > > defined somewhere upstream? Otherwise I could see far fewer events > coming > > out as streams is just aggregating whatever showed up in that 10 second > > window. > > > > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai> > wrote: > > > >> Disabling the cache with: > >> > >> ``` > >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > BUFFERING_CONFIG, > >> 0) > >> ``` > >> > >> Results in: > >> - Emitting many more intermediate calculations. > >> - Still losing data. > >> > >> In my test case it output 342476 intermediate calculations for 3414 > >> distinct windows, 14400 distinct were expected. > >> > >> Regards, > >> Caleb > >> > >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io > > > >> wrote: > >> > >>> This seems to be related to internal KTable caches. You can disable > them > >>> by setting cache size to zero. > >>> > >>> http://docs.confluent.io/current/streams/developer- > >>> guide.html#memory-management > >>> > >>> -Matthias > >>> > >>> > >>> > >>> On 6/14/17 4:08 PM, Caleb Welton wrote: > >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the > >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_ > >>> THREADS_CONFIG=2` > >>>> or higher the problem shows up. > >>>> > >>>> When the number of threads is 1 the speed of data through the first > >> part > >>> of > >>>> the topology (before the ktable) slows down considerably, but it seems > >> to > >>>> slow down to the speed of the output which may be the key. > >>>> > >>>> That said... Changing the number of stream threads should not impact > >> data > >>>> correctness. Seems like a bug someplace in kafka. > >>>> > >>>> > >>>> > >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> > >>> wrote: > >>>> > >>>>> I have a topology of > >>>>> KStream -> KTable -> KStream > >>>>> > >>>>> ``` > >>>>> > >>>>> final KStreamBuilder builder = new KStreamBuilder(); > >>>>> final KStream<String, Metric> metricStream = > >>> builder.stream(ingestTopic); > >>>>> final KTable<Windowed, MyThing> myTable = metricStream > >>>>> .groupByKey(stringSerde, mySerde) > >>>>> .reduce(MyThing::merge, > >>>>> TimeWindows.of(1).advanceBy(1).until( > >>> Duration.ofDays(retentionDays).toMillis()), > >>>>> tableTopic); > >>>>> > >>>>> myTable.toStream() > >>>>> .map((key, value) -> { return (KeyValue.pair(key.key(), > >>> value.finalize(key.window(; }) > >>>>> .to(stringSerde, mySerde, sinkTopic); > >>>>> > >>>>> ``` > >>>>> > >>>>> > >>>>> Normally went sent data at 10x a second I expect ~1 output metric for > >>>>> every 100 metrics it receives, based on the 10 second window width. > >>>>> > >>>>> When fed data real time at that rate it seems to do just that. > >>>>> > >>>>> However when I either reprocess on an input topic with a large amount > >> of > >>>>> data or feed data in significantly faster I see a very different > >>> behavior. > >>>>> > >>>>> Over the course of 20 seconds I can see 1,440,000 messages being > >>> ingested > >>>>> into the ktable, but only 633 emitted from it (expected 14400). > >>>>> > >>>>> Over the next minute the records output creeps to 1796, but then > holds > >>>>> steady and does not keep going up to the expected total of 14400. > >>>>> > >>>>> A consumer reading from the sinkTopic ends up finding about 1264, > >> which > >>> is > >>>>> lower than the 1796 records I would have anticipated from the number > >> of > >>>>> calls into the final kstream map function. > >>>>> > >>>>> Precise number of emitted records will vary from one run to the next. > >>>>> > >>>>> Where are the extra metrics going? Is there some commit issue that > is > >>>>> causing dropped messages if the ktable producer isn't able to keep > up? > >>>>> > >>>>> Any recommendations on where to focus the investigation of the issue? > >>>>> > >>>>> Running Kafka 0.10.2.1. > >>>>> > >>>>> Thanks, > >>>>> Caleb > >>>>> > >>>> > >>> > >>> > >> > > > >
Re: Dropped messages in kstreams?
Disabling the cache with: ``` streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0) ``` Results in: - Emitting many more intermediate calculations. - Still losing data. In my test case it output 342476 intermediate calculations for 3414 distinct windows, 14400 distinct were expected. Regards, Caleb On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io> wrote: > This seems to be related to internal KTable caches. You can disable them > by setting cache size to zero. > > http://docs.confluent.io/current/streams/developer- > guide.html#memory-management > > -Matthias > > > > On 6/14/17 4:08 PM, Caleb Welton wrote: > > Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the > > problem does not manifest, at `StreamsConfig.NUM_STREAM_ > THREADS_CONFIG=2` > > or higher the problem shows up. > > > > When the number of threads is 1 the speed of data through the first part > of > > the topology (before the ktable) slows down considerably, but it seems to > > slow down to the speed of the output which may be the key. > > > > That said... Changing the number of stream threads should not impact data > > correctness. Seems like a bug someplace in kafka. > > > > > > > > On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> > wrote: > > > >> I have a topology of > >> KStream -> KTable -> KStream > >> > >> ``` > >> > >> final KStreamBuilder builder = new KStreamBuilder(); > >> final KStream<String, Metric> metricStream = > builder.stream(ingestTopic); > >> final KTable<Windowed, MyThing> myTable = metricStream > >> .groupByKey(stringSerde, mySerde) > >> .reduce(MyThing::merge, > >> TimeWindows.of(1).advanceBy(1).until( > Duration.ofDays(retentionDays).toMillis()), > >> tableTopic); > >> > >> myTable.toStream() > >> .map((key, value) -> { return (KeyValue.pair(key.key(), > value.finalize(key.window(; }) > >> .to(stringSerde, mySerde, sinkTopic); > >> > >> ``` > >> > >> > >> Normally went sent data at 10x a second I expect ~1 output metric for > >> every 100 metrics it receives, based on the 10 second window width. > >> > >> When fed data real time at that rate it seems to do just that. > >> > >> However when I either reprocess on an input topic with a large amount of > >> data or feed data in significantly faster I see a very different > behavior. > >> > >> Over the course of 20 seconds I can see 1,440,000 messages being > ingested > >> into the ktable, but only 633 emitted from it (expected 14400). > >> > >> Over the next minute the records output creeps to 1796, but then holds > >> steady and does not keep going up to the expected total of 14400. > >> > >> A consumer reading from the sinkTopic ends up finding about 1264, which > is > >> lower than the 1796 records I would have anticipated from the number of > >> calls into the final kstream map function. > >> > >> Precise number of emitted records will vary from one run to the next. > >> > >> Where are the extra metrics going? Is there some commit issue that is > >> causing dropped messages if the ktable producer isn't able to keep up? > >> > >> Any recommendations on where to focus the investigation of the issue? > >> > >> Running Kafka 0.10.2.1. > >> > >> Thanks, > >> Caleb > >> > > > >
Re: Dropped messages in kstreams?
Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the problem does not manifest, at `StreamsConfig.NUM_STREAM_THREADS_CONFIG=2` or higher the problem shows up. When the number of threads is 1 the speed of data through the first part of the topology (before the ktable) slows down considerably, but it seems to slow down to the speed of the output which may be the key. That said... Changing the number of stream threads should not impact data correctness. Seems like a bug someplace in kafka. On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> wrote: > I have a topology of > KStream -> KTable -> KStream > > ``` > > final KStreamBuilder builder = new KStreamBuilder(); > final KStream<String, Metric> metricStream = builder.stream(ingestTopic); > final KTable<Windowed, MyThing> myTable = metricStream > .groupByKey(stringSerde, mySerde) > .reduce(MyThing::merge, > > TimeWindows.of(1).advanceBy(1).until(Duration.ofDays(retentionDays).toMillis()), > tableTopic); > > myTable.toStream() > .map((key, value) -> { return (KeyValue.pair(key.key(), > value.finalize(key.window(; }) > .to(stringSerde, mySerde, sinkTopic); > > ``` > > > Normally went sent data at 10x a second I expect ~1 output metric for > every 100 metrics it receives, based on the 10 second window width. > > When fed data real time at that rate it seems to do just that. > > However when I either reprocess on an input topic with a large amount of > data or feed data in significantly faster I see a very different behavior. > > Over the course of 20 seconds I can see 1,440,000 messages being ingested > into the ktable, but only 633 emitted from it (expected 14400). > > Over the next minute the records output creeps to 1796, but then holds > steady and does not keep going up to the expected total of 14400. > > A consumer reading from the sinkTopic ends up finding about 1264, which is > lower than the 1796 records I would have anticipated from the number of > calls into the final kstream map function. > > Precise number of emitted records will vary from one run to the next. > > Where are the extra metrics going? Is there some commit issue that is > causing dropped messages if the ktable producer isn't able to keep up? > > Any recommendations on where to focus the investigation of the issue? > > Running Kafka 0.10.2.1. > > Thanks, > Caleb >
Dropped messages in kstreams?
I have a topology of KStream -> KTable -> KStream ``` final KStreamBuilder builder = new KStreamBuilder(); final KStreammetricStream = builder.stream(ingestTopic); final KTable myTable = metricStream .groupByKey(stringSerde, mySerde) .reduce(MyThing::merge, TimeWindows.of(1).advanceBy(1).until(Duration.ofDays(retentionDays).toMillis()), tableTopic); myTable.toStream() .map((key, value) -> { return (KeyValue.pair(key.key(), value.finalize(key.window(; }) .to(stringSerde, mySerde, sinkTopic); ``` Normally went sent data at 10x a second I expect ~1 output metric for every 100 metrics it receives, based on the 10 second window width. When fed data real time at that rate it seems to do just that. However when I either reprocess on an input topic with a large amount of data or feed data in significantly faster I see a very different behavior. Over the course of 20 seconds I can see 1,440,000 messages being ingested into the ktable, but only 633 emitted from it (expected 14400). Over the next minute the records output creeps to 1796, but then holds steady and does not keep going up to the expected total of 14400. A consumer reading from the sinkTopic ends up finding about 1264, which is lower than the 1796 records I would have anticipated from the number of calls into the final kstream map function. Precise number of emitted records will vary from one run to the next. Where are the extra metrics going? Is there some commit issue that is causing dropped messages if the ktable producer isn't able to keep up? Any recommendations on where to focus the investigation of the issue? Running Kafka 0.10.2.1. Thanks, Caleb
Re: Brokers is down by “java.io.IOException: Too many open files”
You need to up your OS open file limits, something like this should work: # /etc/security/limits.conf * - nofile 65536 On Fri, May 12, 2017 at 6:34 PM, Yang Cuiwrote: > Our Kafka cluster is broken down by the problem “java.io.IOException: Too > many open files” three times in 3 weeks. > > We encounter these problem on both 0.9.0.1 and 0.10.2.1 version. > > The error is like: > > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at sun.nio.ch.ServerSocketChannelImpl.accept( > ServerSocketChannelImpl.java:422) > at sun.nio.ch.ServerSocketChannelImpl.accept( > ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:340) > at kafka.network.Acceptor.run(SocketServer.scala:283) > at java.lang.Thread.run(Thread.java:745) > > Is someone encounter the similar problem? > > >
Using the vagrant image on aws
Hello, I'm having trouble getting `vagrant/vagrant-up.sh --aws` to work properly. The issue I'm having is as follows: 1. The vagrant install and provisioning complete successfully. 2. ssh into the cluster locally works and running from there works fine 3. connecting to the cluster can be made to work if I manually edit my /etc/hosts file e.g. bash$ kafka-console-producer --topic test --broker-list :9092 Works if `broker1` is registered in my local /etc/hosts file, but not otherwise. This results from the fact that the advertised listener in zookeeper is registered as 'broker1' rather than as the external facing ip name of the node in aws. Is there any way to configure the vagrant installation such that the broker gets advertised in zookeeper with the fqdn of the externally facing address of the aws node rather than an unqualified 'broker1' that then requires local aliasing in /etc/hosts? For reference, here is my Vagrantfile.local ``` enable_dns = true enable_hostmanager = true # General configuration num_zookeepers = 1 num_brokers = 3 num_workers = 0 # Items for deployment into aws ec2_region = "us-west-2" ec2_instance_type = "m3.medium" ec2_associate_public_ip = true ec2_subnet_id = "subnet-11447875" ec2_security_groups = ["sg-c1dd7eb8"]# "vagrant-kafka" ec2_keypair_name = "kafka" ec2_keypair_file = "/Users/cwelton/.aws/kafka.pem" # --- Ubuntu AMI ec2_user = "ubuntu" ec2_ami = "ami-29ebb519" ``` Thanks, Caleb
Re: Performance issue with KafkaStreams
Is there a specific way that I need to build kafka for that to work? bash$ export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh org.apache.kafka.streams.perf.SimpleBenchmark Error: Could not find or load main class org.apache.kafka.streams.perf.SimpleBenchmark bash$ find . -name SimpleBenchmark.java ./streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java bash$ find . -name SimpleBenchmark.class bash$ jar -tf streams/build/libs/kafka-streams-0.10.1.0-SNAPSHOT.jar | grep SimpleBenchmark On Sat, Sep 10, 2016 at 6:18 AM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Caleb, > > We have a benchmark that we run nightly to keep track of performance. The > numbers we have do indicate that consuming through streams is indeed slower > than just a pure consumer, however the performance difference is not as > large as you are observing. Would it be possible for you to run this > benchmark in your environment and report the numbers? The benchmark is > included with Kafka Streams and you run it like this: > > export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh > org.apache.kafka.streams.perf.SimpleBenchmark > > You'll need a Kafka broker to be running. The benchmark will report > various numbers, but one of them is the performance of the consumer and the > performance of streams reading. > > Thanks > Eno > > > On 9 Sep 2016, at 18:19, Caleb Welton <cewel...@yahoo.com.INVALID> > wrote: > > > > Same in both cases: > > client.id=Test-Prototype > > application.id=test-prototype > > > > group.id=test-consumer-group > > bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181 > > replication.factor=2 > > > > auto.offset.reset=earliest > > > > > > On Friday, September 9, 2016 8:48 AM, Eno Thereska < > eno.there...@gmail.com> wrote: > > > > > > > > Hi Caleb, > > > > Could you share your Kafka Streams configuration (i.e., StreamsConfig > > properties you might have set before the test)? > > > > Thanks > > Eno > > > > > > On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org> > wrote: > > > >> I have a question with respect to the KafkaStreams API. > >> > >> I noticed during my prototyping work that my KafkaStreams application > was > >> not able to keep up with the input on the stream so I dug into it a bit > and > >> found that it was spending an inordinate amount of time in > >> org.apache.kafka.common.network.Seloctor.select(). Not exactly a > shooting > >> gun itself, so I dropped the implementation down to a single processor > >> reading off a source. > >> > >> public class TestProcessor extends AbstractProcessor<String, String> { > >>static long start = -1; > >>static long count = 0; > >> > >>@Override > >>public void process(String key, String value) { > >>if (start < 0) { > >>start = System.currentTimeMillis(); > >>} > >>count += 1; > >>if (count > 100) { > >>long end = System.currentTimeMillis(); > >>double time = (end-start)/1000.0; > >>System.out.printf("Processed %d records in %f seconds (%f > >> records/s)\n", count, time, count/time); > >>start = -1; > >>count = 0; > >>} > >> > >>} > >> > >> } > >> > >> ... > >> > >> > >> TopologyBuilder topologyBuilder = new TopologyBuilder(); > >> topologyBuilder > >>.addSource("SOURCE", stringDeserializer, StringDeserializer, > >> "input") > >>.addProcessor("PROCESS", TestProcessor::new, "SOURCE"); > >> > >> > >> > >> Which I then ran through the KafkaStreams API, and then repeated with > >> the KafkaConsumer API. > >> > >> Using the KafkaConsumer API: > >> Processed 101 records in 1.79 seconds (558659.776536 records/s) > >> Processed 101 records in 1.229000 seconds (813670.463792 records/s) > >> Processed 101 records in 1.106000 seconds (904160.036166 records/s) > >> Processed 101 records in 1.19 seconds (840336.974790 records/s) > >> > >> Using the KafkaStreams API: > >> Processed 101 records in 6.407000 seconds (156079.444358 records/s) > >> Processed 101 records in 5.256000 seconds (190258.942161 records/s) > >> Processed 101 records in 5.141000 seconds (194514.880373 records/s) > >> Processed 101 records in 5.111000 seconds (195656.622970 records/s) > >> > >> > >> The profile on the KafkaStreams consisted of: > >> > >> 89.2% org.apache.kafka.common.network.Selector.select() > >> 7.6% org.apache.kafka.clients.producer.internals. > >> ProduceRequestResult.await() > >> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read() > >> > >> > >> Is a 5X performance difference between Kafka Consumer and the > >> KafkaStreams api expected? > >> > >> Are there specific things I can do to diagnose/tune the system? > >> > >> Thanks, > >> Caleb > >> > >
Re: Performance issue with KafkaStreams
Same in both cases: client.id=Test-Prototype application.id=test-prototype group.id=test-consumer-group bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181 replication.factor=2 auto.offset.reset=earliest On Friday, September 9, 2016 8:48 AM, Eno Thereska <eno.there...@gmail.com> wrote: Hi Caleb, Could you share your Kafka Streams configuration (i.e., StreamsConfig properties you might have set before the test)? Thanks Eno On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org> wrote: > I have a question with respect to the KafkaStreams API. > > I noticed during my prototyping work that my KafkaStreams application was > not able to keep up with the input on the stream so I dug into it a bit and > found that it was spending an inordinate amount of time in > org.apache.kafka.common.network.Seloctor.select(). Not exactly a shooting > gun itself, so I dropped the implementation down to a single processor > reading off a source. > > public class TestProcessor extends AbstractProcessor<String, String> { > static long start = -1; > static long count = 0; > > @Override > public void process(String key, String value) { > if (start < 0) { > start = System.currentTimeMillis(); > } > count += 1; > if (count > 100) { > long end = System.currentTimeMillis(); > double time = (end-start)/1000.0; > System.out.printf("Processed %d records in %f seconds (%f > records/s)\n", count, time, count/time); > start = -1; > count = 0; > } > > } > > } > > ... > > > TopologyBuilder topologyBuilder = new TopologyBuilder(); > topologyBuilder > .addSource("SOURCE", stringDeserializer, StringDeserializer, > "input") > .addProcessor("PROCESS", TestProcessor::new, "SOURCE"); > > > > Which I then ran through the KafkaStreams API, and then repeated with > the KafkaConsumer API. > > Using the KafkaConsumer API: > Processed 101 records in 1.79 seconds (558659.776536 records/s) > Processed 101 records in 1.229000 seconds (813670.463792 records/s) > Processed 101 records in 1.106000 seconds (904160.036166 records/s) > Processed 101 records in 1.19 seconds (840336.974790 records/s) > > Using the KafkaStreams API: > Processed 101 records in 6.407000 seconds (156079.444358 records/s) > Processed 101 records in 5.256000 seconds (190258.942161 records/s) > Processed 101 records in 5.141000 seconds (194514.880373 records/s) > Processed 101 records in 5.111000 seconds (195656.622970 records/s) > > > The profile on the KafkaStreams consisted of: > > 89.2% org.apache.kafka.common.network.Selector.select() > 7.6% org.apache.kafka.clients.producer.internals. > ProduceRequestResult.await() > 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read() > > > Is a 5X performance difference between Kafka Consumer and the > KafkaStreams api expected? > > Are there specific things I can do to diagnose/tune the system? > > Thanks, > Caleb >
Performance issue with KafkaStreams
I have a question with respect to the KafkaStreams API. I noticed during my prototyping work that my KafkaStreams application was not able to keep up with the input on the stream so I dug into it a bit and found that it was spending an inordinate amount of time in org.apache.kafka.common.network.Seloctor.select(). Not exactly a shooting gun itself, so I dropped the implementation down to a single processor reading off a source. public class TestProcessor extends AbstractProcessor{ static long start = -1; static long count = 0; @Override public void process(String key, String value) { if (start < 0) { start = System.currentTimeMillis(); } count += 1; if (count > 100) { long end = System.currentTimeMillis(); double time = (end-start)/1000.0; System.out.printf("Processed %d records in %f seconds (%f records/s)\n", count, time, count/time); start = -1; count = 0; } } } ... TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder .addSource("SOURCE", stringDeserializer, StringDeserializer, "input") .addProcessor("PROCESS", TestProcessor::new, "SOURCE"); Which I then ran through the KafkaStreams API, and then repeated with the KafkaConsumer API. Using the KafkaConsumer API: Processed 101 records in 1.79 seconds (558659.776536 records/s) Processed 101 records in 1.229000 seconds (813670.463792 records/s) Processed 101 records in 1.106000 seconds (904160.036166 records/s) Processed 101 records in 1.19 seconds (840336.974790 records/s) Using the KafkaStreams API: Processed 101 records in 6.407000 seconds (156079.444358 records/s) Processed 101 records in 5.256000 seconds (190258.942161 records/s) Processed 101 records in 5.141000 seconds (194514.880373 records/s) Processed 101 records in 5.111000 seconds (195656.622970 records/s) The profile on the KafkaStreams consisted of: 89.2% org.apache.kafka.common.network.Selector.select() 7.6% org.apache.kafka.clients.producer.internals.ProduceRequestResult.await() 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read() Is a 5X performance difference between Kafka Consumer and the KafkaStreams api expected? Are there specific things I can do to diagnose/tune the system? Thanks, Caleb
Joining Streams with Kafka Streams
Hello, I'm trying to understand best practices related to joining streams using the Kafka Streams API. I can configure the topology such that two sources feed into a single processor: topologyBuilder .addSource("A", stringDeserializer, itemDeserializer, "a-topic") .addSource("B", stringDeserializer, itemDeserializer, "b-topic) .addProcessor("hello-join", HelloJoin::new, "A", "B")... And within my processor I can determine which topic a given message came from: public void process(String Key, String value) { if (context.topic.equals("a-topic") { ... } else { ... } This allows for a crude form of cross stream join with the following issues/limitations: i. A string compare on topic name to decide which stream a message came from. Having actual access to the TopicPartition could lead to more efficient validation. Priority low, as this is just a small performance hit, but it is a per message performance hit so would be nice to eliminate. ii. This requires "a-topic" and "b-topic" to have the same message format, which for general join handling is a pretty big limitation. What would be the recommended way to handle the case of different message formats, e.g. needing different deserializers for different input topics? E.g. how would I define my Processor if the topology was: topologyBuilder .addSource("A", stringDeserializer, itemADeserializer, "a-topic") .addSource("B", stringDeserializer, itemBDeserializer, "b-topic) .addProcessor("hello-join", HelloJoin::new, "A", "B")... where itemADeserializer and itemBDeserializer return different classes? Thanks, Caleb