Re: Dropped messages in kstreams?

2017-06-15 Thread Caleb Welton
Short answer seems to be that my Kafka LogRetentionTime was such that the
metrics I was writing were getting purged from kafka during the test.
Dropped metrics.

On Thu, Jun 15, 2017 at 1:32 PM, Caleb Welton <ca...@autonomic.ai> wrote:

> I have encapsulated the repro into a small self contained project:
> https://github.com/cwelton/kstreams-repro
>
> Thanks,
>   Caleb
>
>
> On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote:
>
>> I do have a TimestampExtractor setup and for the 10 second windows that
>> are emitted all the values expected in those windows are present, e.g. each
>> 10 second window gets 100 values aggregated into it.
>>
>> I have no metrics with null keys or values.
>>
>> I will try to get the entire reproduction case packaged up in a way that
>> I can more easily share.
>>
>>
>> On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Another thing to consider? Do you have records will null key or value?
>>> Those records would be dropped and not processes.
>>>
>>> -Matthias
>>>
>>> On 6/15/17 6:24 AM, Garrett Barton wrote:
>>> > Is your time usage correct?  It sounds like you want event time not
>>> > load/process time which is default unless you have a TimestampExtractor
>>> > defined somewhere upstream?  Otherwise I could see far fewer events
>>> coming
>>> > out as streams is just aggregating whatever showed up in that 10 second
>>> > window.
>>> >
>>> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai>
>>> wrote:
>>> >
>>> >> Disabling the cache with:
>>> >>
>>> >> ```
>>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE
>>> RING_CONFIG,
>>> >> 0)
>>> >> ```
>>> >>
>>> >> Results in:
>>> >> - Emitting many more intermediate calculations.
>>> >> - Still losing data.
>>> >>
>>> >> In my test case it output 342476 intermediate calculations for 3414
>>> >> distinct windows, 14400 distinct were expected.
>>> >>
>>> >> Regards,
>>> >>   Caleb
>>> >>
>>> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <
>>> matth...@confluent.io>
>>> >> wrote:
>>> >>
>>> >>> This seems to be related to internal KTable caches. You can disable
>>> them
>>> >>> by setting cache size to zero.
>>> >>>
>>> >>> http://docs.confluent.io/current/streams/developer-
>>> >>> guide.html#memory-management
>>> >>>
>>> >>> -Matthias
>>> >>>
>>> >>>
>>> >>>
>>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote:
>>> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then
>>> the
>>> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
>>> >>> THREADS_CONFIG=2`
>>> >>>> or higher the problem shows up.
>>> >>>>
>>> >>>> When the number of threads is 1 the speed of data through the first
>>> >> part
>>> >>> of
>>> >>>> the topology (before the ktable) slows down considerably, but it
>>> seems
>>> >> to
>>> >>>> slow down to the speed of the output which may be the key.
>>> >>>>
>>> >>>> That said... Changing the number of stream threads should not impact
>>> >> data
>>> >>>> correctness.  Seems like a bug someplace in kafka.
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
>>> >>> wrote:
>>> >>>>
>>> >>>>> I have a topology of
>>> >>>>> KStream -> KTable -> KStream
>>> >>>>>
>>> >>>>> ```
>>> >>>>>
>>> >>>>> final KStreamBuilder builder = new KStreamBuilder();
>>> >>>>> final KStream<String, Metric> metricStream =
>>> >>> builder.stream(ingestTopic);
>>> >>>>> final KTable<Windowed, MyThing> myT

Re: Dropped messages in kstreams?

2017-06-15 Thread Caleb Welton
I have encapsulated the repro into a small self contained project:
https://github.com/cwelton/kstreams-repro

Thanks,
  Caleb


On Thu, Jun 15, 2017 at 11:30 AM, Caleb Welton <ca...@autonomic.ai> wrote:

> I do have a TimestampExtractor setup and for the 10 second windows that
> are emitted all the values expected in those windows are present, e.g. each
> 10 second window gets 100 values aggregated into it.
>
> I have no metrics with null keys or values.
>
> I will try to get the entire reproduction case packaged up in a way that I
> can more easily share.
>
>
> On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> Another thing to consider? Do you have records will null key or value?
>> Those records would be dropped and not processes.
>>
>> -Matthias
>>
>> On 6/15/17 6:24 AM, Garrett Barton wrote:
>> > Is your time usage correct?  It sounds like you want event time not
>> > load/process time which is default unless you have a TimestampExtractor
>> > defined somewhere upstream?  Otherwise I could see far fewer events
>> coming
>> > out as streams is just aggregating whatever showed up in that 10 second
>> > window.
>> >
>> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai>
>> wrote:
>> >
>> >> Disabling the cache with:
>> >>
>> >> ```
>> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFE
>> RING_CONFIG,
>> >> 0)
>> >> ```
>> >>
>> >> Results in:
>> >> - Emitting many more intermediate calculations.
>> >> - Still losing data.
>> >>
>> >> In my test case it output 342476 intermediate calculations for 3414
>> >> distinct windows, 14400 distinct were expected.
>> >>
>> >> Regards,
>> >>   Caleb
>> >>
>> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> >> wrote:
>> >>
>> >>> This seems to be related to internal KTable caches. You can disable
>> them
>> >>> by setting cache size to zero.
>> >>>
>> >>> http://docs.confluent.io/current/streams/developer-
>> >>> guide.html#memory-management
>> >>>
>> >>> -Matthias
>> >>>
>> >>>
>> >>>
>> >>> On 6/14/17 4:08 PM, Caleb Welton wrote:
>> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then
>> the
>> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
>> >>> THREADS_CONFIG=2`
>> >>>> or higher the problem shows up.
>> >>>>
>> >>>> When the number of threads is 1 the speed of data through the first
>> >> part
>> >>> of
>> >>>> the topology (before the ktable) slows down considerably, but it
>> seems
>> >> to
>> >>>> slow down to the speed of the output which may be the key.
>> >>>>
>> >>>> That said... Changing the number of stream threads should not impact
>> >> data
>> >>>> correctness.  Seems like a bug someplace in kafka.
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
>> >>> wrote:
>> >>>>
>> >>>>> I have a topology of
>> >>>>> KStream -> KTable -> KStream
>> >>>>>
>> >>>>> ```
>> >>>>>
>> >>>>> final KStreamBuilder builder = new KStreamBuilder();
>> >>>>> final KStream<String, Metric> metricStream =
>> >>> builder.stream(ingestTopic);
>> >>>>> final KTable<Windowed, MyThing> myTable = metricStream
>> >>>>> .groupByKey(stringSerde, mySerde)
>> >>>>> .reduce(MyThing::merge,
>> >>>>> TimeWindows.of(1).advanceBy(1).until(
>> >>> Duration.ofDays(retentionDays).toMillis()),
>> >>>>> tableTopic);
>> >>>>>
>> >>>>> myTable.toStream()
>> >>>>> .map((key, value) -> { return (KeyValue.pair(key.key(),
>> >>> value.finalize(key.window(; })
>> >>>>> .to(stringSerde, my

Re: Dropped messages in kstreams?

2017-06-15 Thread Caleb Welton
I do have a TimestampExtractor setup and for the 10 second windows that are
emitted all the values expected in those windows are present, e.g. each 10
second window gets 100 values aggregated into it.

I have no metrics with null keys or values.

I will try to get the entire reproduction case packaged up in a way that I
can more easily share.


On Thu, Jun 15, 2017 at 11:18 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Another thing to consider? Do you have records will null key or value?
> Those records would be dropped and not processes.
>
> -Matthias
>
> On 6/15/17 6:24 AM, Garrett Barton wrote:
> > Is your time usage correct?  It sounds like you want event time not
> > load/process time which is default unless you have a TimestampExtractor
> > defined somewhere upstream?  Otherwise I could see far fewer events
> coming
> > out as streams is just aggregating whatever showed up in that 10 second
> > window.
> >
> > On Wed, Jun 14, 2017 at 8:43 PM, Caleb Welton <ca...@autonomic.ai>
> wrote:
> >
> >> Disabling the cache with:
> >>
> >> ```
> >> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> >> 0)
> >> ```
> >>
> >> Results in:
> >> - Emitting many more intermediate calculations.
> >> - Still losing data.
> >>
> >> In my test case it output 342476 intermediate calculations for 3414
> >> distinct windows, 14400 distinct were expected.
> >>
> >> Regards,
> >>   Caleb
> >>
> >> On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io
> >
> >> wrote:
> >>
> >>> This seems to be related to internal KTable caches. You can disable
> them
> >>> by setting cache size to zero.
> >>>
> >>> http://docs.confluent.io/current/streams/developer-
> >>> guide.html#memory-management
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 6/14/17 4:08 PM, Caleb Welton wrote:
> >>>> Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
> >>>> problem does not manifest, at `StreamsConfig.NUM_STREAM_
> >>> THREADS_CONFIG=2`
> >>>> or higher the problem shows up.
> >>>>
> >>>> When the number of threads is 1 the speed of data through the first
> >> part
> >>> of
> >>>> the topology (before the ktable) slows down considerably, but it seems
> >> to
> >>>> slow down to the speed of the output which may be the key.
> >>>>
> >>>> That said... Changing the number of stream threads should not impact
> >> data
> >>>> correctness.  Seems like a bug someplace in kafka.
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
> >>> wrote:
> >>>>
> >>>>> I have a topology of
> >>>>> KStream -> KTable -> KStream
> >>>>>
> >>>>> ```
> >>>>>
> >>>>> final KStreamBuilder builder = new KStreamBuilder();
> >>>>> final KStream<String, Metric> metricStream =
> >>> builder.stream(ingestTopic);
> >>>>> final KTable<Windowed, MyThing> myTable = metricStream
> >>>>> .groupByKey(stringSerde, mySerde)
> >>>>> .reduce(MyThing::merge,
> >>>>> TimeWindows.of(1).advanceBy(1).until(
> >>> Duration.ofDays(retentionDays).toMillis()),
> >>>>> tableTopic);
> >>>>>
> >>>>> myTable.toStream()
> >>>>> .map((key, value) -> { return (KeyValue.pair(key.key(),
> >>> value.finalize(key.window(; })
> >>>>> .to(stringSerde, mySerde, sinkTopic);
> >>>>>
> >>>>> ```
> >>>>>
> >>>>>
> >>>>> Normally went sent data at 10x a second I expect ~1 output metric for
> >>>>> every 100 metrics it receives, based on the 10 second window width.
> >>>>>
> >>>>> When fed data real time at that rate it seems to do just that.
> >>>>>
> >>>>> However when I either reprocess on an input topic with a large amount
> >> of
> >>>>> data or feed data in significantly faster I see a very different
> >>> behavior.
> >>>>>
> >>>>> Over the course of 20 seconds I can see 1,440,000 messages being
> >>> ingested
> >>>>> into the ktable, but only 633 emitted from it (expected 14400).
> >>>>>
> >>>>> Over the next minute the records output creeps to 1796, but then
> holds
> >>>>> steady and does not keep going up to the expected total of 14400.
> >>>>>
> >>>>> A consumer reading from the sinkTopic ends up finding about 1264,
> >> which
> >>> is
> >>>>> lower than the 1796 records I would have anticipated from the number
> >> of
> >>>>> calls into the final kstream map function.
> >>>>>
> >>>>> Precise number of emitted records will vary from one run to the next.
> >>>>>
> >>>>> Where are the extra metrics going?  Is there some commit issue that
> is
> >>>>> causing dropped messages if the ktable producer isn't able to keep
> up?
> >>>>>
> >>>>> Any recommendations on where to focus the investigation of the issue?
> >>>>>
> >>>>> Running Kafka 0.10.2.1.
> >>>>>
> >>>>> Thanks,
> >>>>>   Caleb
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>


Re: Dropped messages in kstreams?

2017-06-14 Thread Caleb Welton
Disabling the cache with:

```
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0)
```

Results in:
- Emitting many more intermediate calculations.
- Still losing data.

In my test case it output 342476 intermediate calculations for 3414
distinct windows, 14400 distinct were expected.

Regards,
  Caleb

On Wed, Jun 14, 2017 at 5:13 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> This seems to be related to internal KTable caches. You can disable them
> by setting cache size to zero.
>
> http://docs.confluent.io/current/streams/developer-
> guide.html#memory-management
>
> -Matthias
>
>
>
> On 6/14/17 4:08 PM, Caleb Welton wrote:
> > Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
> > problem does not manifest, at `StreamsConfig.NUM_STREAM_
> THREADS_CONFIG=2`
> > or higher the problem shows up.
> >
> > When the number of threads is 1 the speed of data through the first part
> of
> > the topology (before the ktable) slows down considerably, but it seems to
> > slow down to the speed of the output which may be the key.
> >
> > That said... Changing the number of stream threads should not impact data
> > correctness.  Seems like a bug someplace in kafka.
> >
> >
> >
> > On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai>
> wrote:
> >
> >> I have a topology of
> >> KStream -> KTable -> KStream
> >>
> >> ```
> >>
> >> final KStreamBuilder builder = new KStreamBuilder();
> >> final KStream<String, Metric> metricStream =
> builder.stream(ingestTopic);
> >> final KTable<Windowed, MyThing> myTable = metricStream
> >> .groupByKey(stringSerde, mySerde)
> >> .reduce(MyThing::merge,
> >> TimeWindows.of(1).advanceBy(1).until(
> Duration.ofDays(retentionDays).toMillis()),
> >> tableTopic);
> >>
> >> myTable.toStream()
> >> .map((key, value) -> { return (KeyValue.pair(key.key(),
> value.finalize(key.window(; })
> >> .to(stringSerde, mySerde, sinkTopic);
> >>
> >> ```
> >>
> >>
> >> Normally went sent data at 10x a second I expect ~1 output metric for
> >> every 100 metrics it receives, based on the 10 second window width.
> >>
> >> When fed data real time at that rate it seems to do just that.
> >>
> >> However when I either reprocess on an input topic with a large amount of
> >> data or feed data in significantly faster I see a very different
> behavior.
> >>
> >> Over the course of 20 seconds I can see 1,440,000 messages being
> ingested
> >> into the ktable, but only 633 emitted from it (expected 14400).
> >>
> >> Over the next minute the records output creeps to 1796, but then holds
> >> steady and does not keep going up to the expected total of 14400.
> >>
> >> A consumer reading from the sinkTopic ends up finding about 1264, which
> is
> >> lower than the 1796 records I would have anticipated from the number of
> >> calls into the final kstream map function.
> >>
> >> Precise number of emitted records will vary from one run to the next.
> >>
> >> Where are the extra metrics going?  Is there some commit issue that is
> >> causing dropped messages if the ktable producer isn't able to keep up?
> >>
> >> Any recommendations on where to focus the investigation of the issue?
> >>
> >> Running Kafka 0.10.2.1.
> >>
> >> Thanks,
> >>   Caleb
> >>
> >
>
>


Re: Dropped messages in kstreams?

2017-06-14 Thread Caleb Welton
Update, if I set `StreamsConfig.NUM_STREAM_THREADS_CONFIG=1` then the
problem does not manifest, at `StreamsConfig.NUM_STREAM_THREADS_CONFIG=2`
or higher the problem shows up.

When the number of threads is 1 the speed of data through the first part of
the topology (before the ktable) slows down considerably, but it seems to
slow down to the speed of the output which may be the key.

That said... Changing the number of stream threads should not impact data
correctness.  Seems like a bug someplace in kafka.



On Wed, Jun 14, 2017 at 2:53 PM, Caleb Welton <ca...@autonomic.ai> wrote:

> I have a topology of
> KStream -> KTable -> KStream
>
> ```
>
> final KStreamBuilder builder = new KStreamBuilder();
> final KStream<String, Metric> metricStream = builder.stream(ingestTopic);
> final KTable<Windowed, MyThing> myTable = metricStream
> .groupByKey(stringSerde, mySerde)
> .reduce(MyThing::merge,
> 
> TimeWindows.of(1).advanceBy(1).until(Duration.ofDays(retentionDays).toMillis()),
> tableTopic);
>
> myTable.toStream()
> .map((key, value) -> { return (KeyValue.pair(key.key(), 
> value.finalize(key.window(; })
> .to(stringSerde, mySerde, sinkTopic);
>
> ```
>
>
> Normally went sent data at 10x a second I expect ~1 output metric for
> every 100 metrics it receives, based on the 10 second window width.
>
> When fed data real time at that rate it seems to do just that.
>
> However when I either reprocess on an input topic with a large amount of
> data or feed data in significantly faster I see a very different behavior.
>
> Over the course of 20 seconds I can see 1,440,000 messages being ingested
> into the ktable, but only 633 emitted from it (expected 14400).
>
> Over the next minute the records output creeps to 1796, but then holds
> steady and does not keep going up to the expected total of 14400.
>
> A consumer reading from the sinkTopic ends up finding about 1264, which is
> lower than the 1796 records I would have anticipated from the number of
> calls into the final kstream map function.
>
> Precise number of emitted records will vary from one run to the next.
>
> Where are the extra metrics going?  Is there some commit issue that is
> causing dropped messages if the ktable producer isn't able to keep up?
>
> Any recommendations on where to focus the investigation of the issue?
>
> Running Kafka 0.10.2.1.
>
> Thanks,
>   Caleb
>


Dropped messages in kstreams?

2017-06-14 Thread Caleb Welton
I have a topology of
KStream -> KTable -> KStream

```

final KStreamBuilder builder = new KStreamBuilder();
final KStream metricStream = builder.stream(ingestTopic);
final KTable myTable = metricStream
.groupByKey(stringSerde, mySerde)
.reduce(MyThing::merge,

TimeWindows.of(1).advanceBy(1).until(Duration.ofDays(retentionDays).toMillis()),
tableTopic);

myTable.toStream()
.map((key, value) -> { return (KeyValue.pair(key.key(),
value.finalize(key.window(; })
.to(stringSerde, mySerde, sinkTopic);

```


Normally went sent data at 10x a second I expect ~1 output metric for every
100 metrics it receives, based on the 10 second window width.

When fed data real time at that rate it seems to do just that.

However when I either reprocess on an input topic with a large amount of
data or feed data in significantly faster I see a very different behavior.

Over the course of 20 seconds I can see 1,440,000 messages being ingested
into the ktable, but only 633 emitted from it (expected 14400).

Over the next minute the records output creeps to 1796, but then holds
steady and does not keep going up to the expected total of 14400.

A consumer reading from the sinkTopic ends up finding about 1264, which is
lower than the 1796 records I would have anticipated from the number of
calls into the final kstream map function.

Precise number of emitted records will vary from one run to the next.

Where are the extra metrics going?  Is there some commit issue that is
causing dropped messages if the ktable producer isn't able to keep up?

Any recommendations on where to focus the investigation of the issue?

Running Kafka 0.10.2.1.

Thanks,
  Caleb


Re: Brokers is down by “java.io.IOException: Too many open files”

2017-05-12 Thread Caleb Welton
You need to up your OS open file limits, something like this should work:

# /etc/security/limits.conf
* - nofile 65536




On Fri, May 12, 2017 at 6:34 PM, Yang Cui  wrote:

> Our Kafka cluster is broken down by  the problem “java.io.IOException: Too
> many open files”  three times in 3 weeks.
>
> We encounter these problem on both 0.9.0.1 and 0.10.2.1 version.
>
> The error is like:
>
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(
> ServerSocketChannelImpl.java:422)
> at sun.nio.ch.ServerSocketChannelImpl.accept(
> ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:340)
> at kafka.network.Acceptor.run(SocketServer.scala:283)
> at java.lang.Thread.run(Thread.java:745)
>
> Is someone encounter the similar problem?
>
>
>


Using the vagrant image on aws

2016-09-28 Thread Caleb Welton
Hello,

I'm having trouble getting `vagrant/vagrant-up.sh --aws` to work properly.

The issue I'm having is as follows:

1. The vagrant install and provisioning complete successfully.
2. ssh into the cluster locally works and running from there works fine
3. connecting to the cluster can be made to work if I manually edit my
/etc/hosts file

e.g.

bash$  kafka-console-producer --topic test --broker-list
:9092

Works if `broker1` is registered in my local /etc/hosts file, but not
otherwise.

This results from the fact that the advertised listener in zookeeper is
registered as 'broker1' rather than as the external facing ip name of the
node in aws.

Is there any way to configure the vagrant installation such that the broker
gets advertised in zookeeper with the fqdn of the externally facing address
of the aws node rather than an unqualified 'broker1' that then requires
local aliasing in /etc/hosts?

For reference, here is my Vagrantfile.local


```
enable_dns = true
enable_hostmanager = true

# General configuration
num_zookeepers = 1
num_brokers = 3
num_workers = 0

# Items for deployment into aws
ec2_region = "us-west-2"
ec2_instance_type  = "m3.medium"
ec2_associate_public_ip = true
ec2_subnet_id  = "subnet-11447875"
ec2_security_groups = ["sg-c1dd7eb8"]# "vagrant-kafka"
ec2_keypair_name  = "kafka"
ec2_keypair_file = "/Users/cwelton/.aws/kafka.pem"

# --- Ubuntu AMI 
ec2_user  = "ubuntu"
ec2_ami   = "ami-29ebb519"
```


Thanks,
  Caleb


Re: Performance issue with KafkaStreams

2016-09-16 Thread Caleb Welton
Is there a specific way that I need to build kafka for that to work?

bash$  export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
org.apache.kafka.streams.perf.SimpleBenchmark
Error: Could not find or load main class
org.apache.kafka.streams.perf.SimpleBenchmark

bash$ find . -name SimpleBenchmark.java
./streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java

bash$  find . -name SimpleBenchmark.class

bash$ jar -tf streams/build/libs/kafka-streams-0.10.1.0-SNAPSHOT.jar | grep
SimpleBenchmark



On Sat, Sep 10, 2016 at 6:18 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Caleb,
>
> We have a benchmark that we run nightly to keep track of performance. The
> numbers we have do indicate that consuming through streams is indeed slower
> than just a pure consumer, however the performance difference is not as
> large as you are observing. Would it be possible for you to run this
> benchmark in your environment and report the numbers? The benchmark is
> included with Kafka Streams and you run it like this:
>
> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh
> org.apache.kafka.streams.perf.SimpleBenchmark
>
> You'll need a Kafka broker to be running. The benchmark will report
> various numbers, but one of them is the performance of the consumer and the
> performance of streams reading.
>
> Thanks
> Eno
>
> > On 9 Sep 2016, at 18:19, Caleb Welton <cewel...@yahoo.com.INVALID>
> wrote:
> >
> > Same in both cases:
> > client.id=Test-Prototype
> > application.id=test-prototype
> >
> > group.id=test-consumer-group
> > bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
> > replication.factor=2
> >
> > auto.offset.reset=earliest
> >
> >
> > On Friday, September 9, 2016 8:48 AM, Eno Thereska <
> eno.there...@gmail.com> wrote:
> >
> >
> >
> > Hi Caleb,
> >
> > Could you share your Kafka Streams configuration (i.e., StreamsConfig
> > properties you might have set before the test)?
> >
> > Thanks
> > Eno
> >
> >
> > On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org>
> wrote:
> >
> >> I have a question with respect to the KafkaStreams API.
> >>
> >> I noticed during my prototyping work that my KafkaStreams application
> was
> >> not able to keep up with the input on the stream so I dug into it a bit
> and
> >> found that it was spending an inordinate amount of time in
> >> org.apache.kafka.common.network.Seloctor.select().  Not exactly a
> shooting
> >> gun itself, so I dropped the implementation down to a single processor
> >> reading off a source.
> >>
> >> public class TestProcessor extends AbstractProcessor<String, String> {
> >>static long start = -1;
> >>static long count = 0;
> >>
> >>@Override
> >>public void process(String key, String value) {
> >>if (start < 0) {
> >>start = System.currentTimeMillis();
> >>}
> >>count += 1;
> >>if (count > 100) {
> >>long end = System.currentTimeMillis();
> >>double time = (end-start)/1000.0;
> >>System.out.printf("Processed %d records in %f seconds (%f
> >> records/s)\n", count, time, count/time);
> >>start = -1;
> >>count = 0;
> >>}
> >>
> >>}
> >>
> >> }
> >>
> >> ...
> >>
> >>
> >> TopologyBuilder topologyBuilder = new TopologyBuilder();
> >> topologyBuilder
> >>.addSource("SOURCE", stringDeserializer, StringDeserializer,
> >> "input")
> >>.addProcessor("PROCESS", TestProcessor::new, "SOURCE");
> >>
> >>
> >>
> >> Which I then ran through the KafkaStreams API, and then repeated with
> >> the KafkaConsumer API.
> >>
> >> Using the KafkaConsumer API:
> >> Processed 101 records in 1.79 seconds (558659.776536 records/s)
> >> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
> >> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
> >> Processed 101 records in 1.19 seconds (840336.974790 records/s)
> >>
> >> Using the KafkaStreams API:
> >> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
> >> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
> >> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
> >> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
> >>
> >>
> >> The profile on the KafkaStreams consisted of:
> >>
> >> 89.2% org.apache.kafka.common.network.Selector.select()
> >> 7.6% org.apache.kafka.clients.producer.internals.
> >> ProduceRequestResult.await()
> >> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
> >>
> >>
> >> Is a 5X performance difference between Kafka Consumer and the
> >> KafkaStreams api expected?
> >>
> >> Are there specific things I can do to diagnose/tune the system?
> >>
> >> Thanks,
> >>  Caleb
> >>
>
>


Re: Performance issue with KafkaStreams

2016-09-09 Thread Caleb Welton
Same in both cases:
client.id=Test-Prototype
application.id=test-prototype

group.id=test-consumer-group
bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
replication.factor=2

auto.offset.reset=earliest


On Friday, September 9, 2016 8:48 AM, Eno Thereska <eno.there...@gmail.com> 
wrote:



Hi Caleb,

Could you share your Kafka Streams configuration (i.e., StreamsConfig
properties you might have set before the test)?

Thanks
Eno


On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton <cwel...@apache.org> wrote:

> I have a question with respect to the KafkaStreams API.
>
> I noticed during my prototyping work that my KafkaStreams application was
> not able to keep up with the input on the stream so I dug into it a bit and
> found that it was spending an inordinate amount of time in
> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
> gun itself, so I dropped the implementation down to a single processor
> reading off a source.
>
> public class TestProcessor extends AbstractProcessor<String, String> {
> static long start = -1;
> static long count = 0;
>
> @Override
> public void process(String key, String value) {
> if (start < 0) {
> start = System.currentTimeMillis();
> }
> count += 1;
> if (count > 100) {
> long end = System.currentTimeMillis();
> double time = (end-start)/1000.0;
> System.out.printf("Processed %d records in %f seconds (%f
> records/s)\n", count, time, count/time);
> start = -1;
> count = 0;
> }
>
> }
>
> }
>
> ...
>
>
> TopologyBuilder topologyBuilder = new TopologyBuilder();
> topologyBuilder
> .addSource("SOURCE", stringDeserializer, StringDeserializer,
> "input")
> .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>
>
>
> Which I then ran through the KafkaStreams API, and then repeated with
> the KafkaConsumer API.
>
> Using the KafkaConsumer API:
> Processed 101 records in 1.79 seconds (558659.776536 records/s)
> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
> Processed 101 records in 1.19 seconds (840336.974790 records/s)
>
> Using the KafkaStreams API:
> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
>
>
> The profile on the KafkaStreams consisted of:
>
> 89.2% org.apache.kafka.common.network.Selector.select()
>  7.6% org.apache.kafka.clients.producer.internals.
> ProduceRequestResult.await()
>  0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>
>
> Is a 5X performance difference between Kafka Consumer and the
> KafkaStreams api expected?
>
> Are there specific things I can do to diagnose/tune the system?
>
> Thanks,
>   Caleb
>


Performance issue with KafkaStreams

2016-09-07 Thread Caleb Welton
I have a question with respect to the KafkaStreams API.

I noticed during my prototyping work that my KafkaStreams application was
not able to keep up with the input on the stream so I dug into it a bit and
found that it was spending an inordinate amount of time in
org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
gun itself, so I dropped the implementation down to a single processor
reading off a source.

public class TestProcessor extends AbstractProcessor {
static long start = -1;
static long count = 0;

@Override
public void process(String key, String value) {
if (start < 0) {
start = System.currentTimeMillis();
}
count += 1;
if (count > 100) {
long end = System.currentTimeMillis();
double time = (end-start)/1000.0;
System.out.printf("Processed %d records in %f seconds (%f
records/s)\n", count, time, count/time);
start = -1;
count = 0;
}

}

}

...


TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder
.addSource("SOURCE", stringDeserializer, StringDeserializer, "input")
.addProcessor("PROCESS", TestProcessor::new, "SOURCE");



Which I then ran through the KafkaStreams API, and then repeated with
the KafkaConsumer API.

Using the KafkaConsumer API:
Processed 101 records in 1.79 seconds (558659.776536 records/s)
Processed 101 records in 1.229000 seconds (813670.463792 records/s)
Processed 101 records in 1.106000 seconds (904160.036166 records/s)
Processed 101 records in 1.19 seconds (840336.974790 records/s)

Using the KafkaStreams API:
Processed 101 records in 6.407000 seconds (156079.444358 records/s)
Processed 101 records in 5.256000 seconds (190258.942161 records/s)
Processed 101 records in 5.141000 seconds (194514.880373 records/s)
Processed 101 records in 5.111000 seconds (195656.622970 records/s)


The profile on the KafkaStreams consisted of:

89.2% org.apache.kafka.common.network.Selector.select()
 7.6% org.apache.kafka.clients.producer.internals.ProduceRequestResult.await()
 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()


Is a 5X performance difference between Kafka Consumer and the
KafkaStreams api expected?

Are there specific things I can do to diagnose/tune the system?

Thanks,
  Caleb


Joining Streams with Kafka Streams

2016-08-25 Thread Caleb Welton
Hello,

I'm trying to understand best practices related to joining streams using
the Kafka Streams API.

I can configure the topology such that two sources feed into a single
processor:

topologyBuilder
.addSource("A", stringDeserializer, itemDeserializer, "a-topic")
.addSource("B", stringDeserializer, itemDeserializer, "b-topic)
.addProcessor("hello-join", HelloJoin::new, "A", "B")...

And within my processor I can determine which topic a given message came
from:

public void process(String Key, String value) {
 if (context.topic.equals("a-topic") {
 ...
 } else {
 ...
 }

This allows for a crude form of cross stream join with the following
issues/limitations:

  i.  A string compare on topic name to decide which stream a message came
from.  Having actual access to the TopicPartition could lead to more
efficient validation.  Priority low, as this is just a small performance
hit, but it is a per message performance hit so would be nice to eliminate.

  ii. This requires "a-topic" and "b-topic" to have the same message
format, which for general join handling is a pretty big limitation.  What
would be the recommended way to handle the case of different message
formats, e.g. needing different deserializers for different input topics?

E.g. how would I define my Processor if the topology was:

topologyBuilder
.addSource("A", stringDeserializer, itemADeserializer, "a-topic")
.addSource("B", stringDeserializer, itemBDeserializer, "b-topic)
.addProcessor("hello-join", HelloJoin::new, "A", "B")...

where itemADeserializer and itemBDeserializer return different classes?

Thanks,
  Caleb