Re: RAID10 for kafka cluster

2017-03-07 Thread Gerrit Jansen van Vuuren
in my opinion for production I would run with raid10, its true that kafka
has durability as to shutdown of brokers, there are exceptions or hiccups,
if you want to avoid tons of movement between brokers and connection errors
on the clients (which may or not depending on how loaded your cluster is
cause issues) due to disk failures, raid10 is the way to go, you'll have a
much more stable cluster all round.

For development it depends, I run a vagrant instance locally on my laptop
for development :/




On Tue, Mar 7, 2017 at 9:52 AM, Mich Talebzadeh 
wrote:

> is RAID 10 (mirrored and striped) essential for kafka cluster with three
> nodes in DEV as I have been told that disk failures can shutdown the whole
> kafka clusters? this is a non-prod environment
>
> thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  OABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>


Re: Scaling up kafka consumers

2017-02-24 Thread Gerrit Jansen van Vuuren
The kafka fast connector handles this differently than the standard kafka
client (which requires one consumer per partition at most), by breaking
offsets into consumable ranges which allows one partition to be read by
multiple conumers where each consumer uniquely receives a different offset
range.

See:
https://github.com/gerritjvv/kafka-fast

That said:
There are two usage scenarios:
Lots of topics (10 or more) where you'd want 1-2-4 partitions or less
topics where you want more partitions, if yours is the latter you'd want
more than a single partition 4-6 are better numbers ( numbers are from my
experience with 5 nodes).



On 24 Feb 2017 17:30, "Jakub Stransky"  wrote:

> Hello everyone,
>
> I was reading/checking kafka documentation regarding point-2-point and
> publish subscribe communications patterns in kafka and I am wondering how
> to scale up consumer side in point to point scenario when consuming from
> single kafka topic.
>
> Let say I have a single topic with single partition and I have one node
> where the kafka consumer is running. If I want to scale up my service I add
> another node - which has the same configuration as the first one (topic,
> partition and consumer group id). Those two nodes start competing for
> messages from kafka topic.
>
> What I am not sure in this scenario and is actually subject of my question
> is "*Whether they do get each node unique messages or there is still
> possibility that some messages will be consumed by both nodes etc*".
> Because I can see scenarios that both nodes are started at the same time -
> they gets the same topic offset from zookeeper and started consuming
> messages from that offset. OR am I thinking in a wrong direction?
>
> Thanks
> Jakub
>


Re: Kafka SASL_PLAINTEXT and authentication/authorization backend failure

2017-01-19 Thread Gerrit Jansen van Vuuren
Hi,

I refer to the broker behaviour, for most part without SASL the brokers do
respond but as soon as you put SASL into the mix it does hang whenever
there is something that goes wrong, i.e the broker doesn't provide a
response on the socket and the client waits reading a response that it
never gets and timeout.

e.g

kafka 0.9 SASL protocol is [kerberos specific interchange] 
kafka 0.10   is [len][kafka protocol][kerberos specific
interchange] 

the kerberos specific interchange parts is where things can hang (meaning
the client or broker are waiting for information from either that never
arrives), and if you write [len][kafka protocol] to a SASL configured 0.9
kafka broker or vice versa the client timeout eventually not on retrying
but on waiting on a response from the broker.

I would file a bug but don't think this behaviour can be helped here.

Regards,
Gerrit




On Fri, Jan 20, 2017 at 12:05 AM, Ismael Juma  wrote:

> Hi Gerrit,
>
> I think it's important to distinguish broker and client behaviour. The
> clients can hang because they keep retrying when they get certain errors.
> When it comes to the broker, it should give you errors as a general rule.
> If you are aware of certain scenarios where it should give an error and it
> doesn't, then please file a bug with steps to reproduce.
>
> Ismael
>
> On Thu, Jan 19, 2017 at 6:48 PM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > I've added kerberos support for https://github.com/gerritjvv/kafka-fast
> > and
> > have seen that the kafka brokers do not send any response if the SASL
> > authentication is not correct or accepted, thus causing the client to
> hang
> > while waiting for a response from kafka.
> >
> > Some things that might help to debug:
> >
> >- kafka 0.9's SASL auth is in-compatible with 0.10 and not using the
> >correct version will cause the kafka client to hang.
> >-  use -Dsun.security.krb5.debug=true and
> > -Djava.security.debug=gssloginconfig,configfile,
> configparser,logincontext
> > to see debug info about what's going on.
> >
> >
> > Some reading material can be found at:
> > https://github.com/gerritjvv/kafka-fast/blob/master/kafka-
> clj/Kerberos.md
> >
> > and if you want to see or need for testing a vagrant env with kerberos +
> > kafka configured see
> > https://github.com/gerritjvv/kafka-fast/blob/master/kafka-
> > clj/doc/vagrant.md
> >
> >
> >
> >
> > On Thu, Jan 19, 2017 at 7:37 PM, Christian  wrote:
> >
> > > I have successfully gotten SASL_PLAINTEXT configured on Kafka cluster.
> We
> > > implemented our own LoginModule and Server with the following caveat
> > that I
> > > am guessing I am doing something wrong.
> > >
> > > The LoginModule's login method acquires a session id from an internal
> > > security system and populates the subject with the relevant
> information.
> > In
> > > the server evaluateResponse we then validate that session.  On success,
> > > everything is great. However, when the evaulateResponse returns with a
> > > failure (throws an exception), the producer client just hangs when
> > sending
> > > a message until the configured timeout occurs. Interestingly enough, we
> > see
> > > the evaulateResponse method is getting called about every second until
> > the
> > > the producer client finally times out.
> > >
> > > We get this same behavior when using the PlainLoginModule provided with
> > > Kafka after changing the password on the client side to something
> > different
> > > from the server side.
> > >
> > > Is this expected behavior?
> > >
> > > Thanks,
> > > Christian
> > >
> >
>


Re: Kafka SASL_PLAINTEXT and authentication/authorization backend failure

2017-01-19 Thread Gerrit Jansen van Vuuren
If you mean authorization with kafka (not with kerberos)  then yes, seems
that kafka stops responding when it doesn't get exactly what it expects :/
no errors just timeouts.

On Thu, Jan 19, 2017 at 8:59 PM, Christian  wrote:

> Thanks for the response Gerrit! It seems like authorization has the same
> behavior. Have you experienced that as well?
>
> On Thu, Jan 19, 2017 at 11:48 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > I've added kerberos support for https://github.com/gerritjvv/kafka-fast
> > and
> > have seen that the kafka brokers do not send any response if the SASL
> > authentication is not correct or accepted, thus causing the client to
> hang
> > while waiting for a response from kafka.
> >
> > Some things that might help to debug:
> >
> >- kafka 0.9's SASL auth is in-compatible with 0.10 and not using the
> >correct version will cause the kafka client to hang.
> >-  use -Dsun.security.krb5.debug=true and
> > -Djava.security.debug=gssloginconfig,configfile,
> configparser,logincontext
> > to see debug info about what's going on.
> >
> >
> > Some reading material can be found at:
> > https://github.com/gerritjvv/kafka-fast/blob/master/kafka-
> clj/Kerberos.md
> >
> > and if you want to see or need for testing a vagrant env with kerberos +
> > kafka configured see
> > https://github.com/gerritjvv/kafka-fast/blob/master/kafka-
> > clj/doc/vagrant.md
> >
> >
> >
> >
> > On Thu, Jan 19, 2017 at 7:37 PM, Christian  wrote:
> >
> > > I have successfully gotten SASL_PLAINTEXT configured on Kafka cluster.
> We
> > > implemented our own LoginModule and Server with the following caveat
> > that I
> > > am guessing I am doing something wrong.
> > >
> > > The LoginModule's login method acquires a session id from an internal
> > > security system and populates the subject with the relevant
> information.
> > In
> > > the server evaluateResponse we then validate that session.  On success,
> > > everything is great. However, when the evaulateResponse returns with a
> > > failure (throws an exception), the producer client just hangs when
> > sending
> > > a message until the configured timeout occurs. Interestingly enough, we
> > see
> > > the evaulateResponse method is getting called about every second until
> > the
> > > the producer client finally times out.
> > >
> > > We get this same behavior when using the PlainLoginModule provided with
> > > Kafka after changing the password on the client side to something
> > different
> > > from the server side.
> > >
> > > Is this expected behavior?
> > >
> > > Thanks,
> > > Christian
> > >
> >
>


Re: Kafka SASL_PLAINTEXT and authentication/authorization backend failure

2017-01-19 Thread Gerrit Jansen van Vuuren
Hi,

I've added kerberos support for https://github.com/gerritjvv/kafka-fast and
have seen that the kafka brokers do not send any response if the SASL
authentication is not correct or accepted, thus causing the client to hang
while waiting for a response from kafka.

Some things that might help to debug:

   - kafka 0.9's SASL auth is in-compatible with 0.10 and not using the
   correct version will cause the kafka client to hang.
   -  use -Dsun.security.krb5.debug=true and
-Djava.security.debug=gssloginconfig,configfile,configparser,logincontext
to see debug info about what's going on.


Some reading material can be found at:
https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/Kerberos.md

and if you want to see or need for testing a vagrant env with kerberos +
kafka configured see
https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/doc/vagrant.md




On Thu, Jan 19, 2017 at 7:37 PM, Christian  wrote:

> I have successfully gotten SASL_PLAINTEXT configured on Kafka cluster. We
> implemented our own LoginModule and Server with the following caveat that I
> am guessing I am doing something wrong.
>
> The LoginModule's login method acquires a session id from an internal
> security system and populates the subject with the relevant information. In
> the server evaluateResponse we then validate that session.  On success,
> everything is great. However, when the evaulateResponse returns with a
> failure (throws an exception), the producer client just hangs when sending
> a message until the configured timeout occurs. Interestingly enough, we see
> the evaulateResponse method is getting called about every second until the
> the producer client finally times out.
>
> We get this same behavior when using the PlainLoginModule provided with
> Kafka after changing the password on the client side to something different
> from the server side.
>
> Is this expected behavior?
>
> Thanks,
> Christian
>


Re: Serious problem

2017-01-06 Thread Gerrit Jansen van Vuuren
The kafka brokers have a maximum message size limit, this is a protection
measure and avoids sending monster messages to kafka.

You have two options:
 1. On the brokers, increase the max.request.size, default is at ~2mb,
making it 5 or even 10 is not an issue normally. Java applications can
happily handle this size.
 2. breakup you're records into smaller size.

If you haven't enabled compression, enable compression in your client
configuration.

You should also probably inspect this particular message that goes over
2mb, if its web data sometimes bogus hrefs with looong query strings that
make no sense can be dropped in a pre processor before sending to kafka. If
its real custom data, you should try to rethink your data model and spend
some time on finding the right data format that provides best compression
and speed (e.g GPB, Avro, JSON + GZIP), csv is almost never what you want.

Hope this helps.

Regards,
 Gerrit


On Fri, Jan 6, 2017 at 8:12 AM, zhilong wang 
wrote:

> flume-kafka-sink send message to kafka-0.9
>
>
>
>
> 06 Jan 2017 13:13:36,595 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver
> event. Exception follows
> org.apache.flume.EventDeliveryException: Failed to publish events
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(
> DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is
> 2742363 bytes when serialized which is larger than th
> e maximum request size you have configured with the max.request.size
> configuration.
> at
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(
> KafkaProducer.java:686)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(
> KafkaProducer.java:449)
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
> ... 3 more
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
> message is 2742363 bytes when serialized which is larger than the maximum
> request size you have configure
> d with the max.request.size configuration.
>


Re: Setup Kerberos for Kafka on Ubuntu Linux

2016-12-30 Thread Gerrit Jansen van Vuuren
also, before tinkering with Kerberos, I'd advise to read the first 4
chapters of "Kerberos: The Definite Guide"
https://www.amazon.com/Kerberos-Definitive-Guide-Jason-Garman/dp/0596004036/ref=sr_1_1?ie=UTF8&qid=1483116842&sr=8-1&keywords=kerberos

It helped me and after a week of reading I could get started working on
using kerberos without getting senselessly frustrated all the time.



On Fri, Dec 30, 2016 at 5:49 PM, Gerrit Jansen van Vuuren <
gerrit...@gmail.com> wrote:

> make sure kafka1 is the FQN and that the server kafka1 can resolve
> properly from you're kerberos server, EXAMPLE.COM should be a realm that
> is configured in krb5.conf and kdc.conf, with the adequate domain mappings
> for kafka1 to this realm.
>
> Kerberos is a pain and there are tons of stuff that can go wrong :)
>
> As part of adding kerberos to the kafka-clj connector I've setup a Vagrant
> build that creates kerberos and kafka kerberised instances, this may help
> you getting started if all you're looking for is testing kafka with
> kerberos.
>
> https://github.com/gerritjvv/kafka-fast/blob/kerberos/kafka-
> clj/doc/vagrant.md
>
> https://github.com/gerritjvv/kafka-fast/blob/kerberos/kafka-
> clj/Vagrantfile
> See https://github.com/gerritjvv/kafka-fast/blob/kerberos/kafka-
> clj/vagrant/scripts/kerberos.sh (installs kerberos and adds principles)
>
> note: still under development :)
>
>
> On Fri, Dec 30, 2016 at 5:19 PM, Raghav  wrote:
>
>> Hi
>>
>> I have never dabbled Kafka with security settings. I was trying to follow
>> this blog to get it working:
>> http://kafka.apache.org/documentation.html#security_sasl
>>
>> But I can't seem to add principals for Kafka in Kerberos server, and I am
>> getting the following error:
>>
>> dp@kdc:~$ sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/
>> kaf...@example.com'
>> [sudo] password for dp:
>> Authenticating as principal root/ad...@example.com with password.
>> WARNING: no policy specified for kafka/kaf...@example.com; defaulting to
>> no
>> policy
>> add_principal: No such entry in the database while creating "kafka/
>> kaf...@example.com".
>> dp@kdc:~$ ^C
>>
>> Can anyone share more insight into how they got a simple Kerberos server
>> to
>> work with Kafka, and able to send one message end to end. I greatly
>> appreciate your help.
>>
>> Many thanks.
>>
>> --
>> Raghav
>>
>
>


Re: Setup Kerberos for Kafka on Ubuntu Linux

2016-12-30 Thread Gerrit Jansen van Vuuren
make sure kafka1 is the FQN and that the server kafka1 can resolve properly
from you're kerberos server, EXAMPLE.COM should be a realm that is
configured in krb5.conf and kdc.conf, with the adequate domain mappings for
kafka1 to this realm.

Kerberos is a pain and there are tons of stuff that can go wrong :)

As part of adding kerberos to the kafka-clj connector I've setup a Vagrant
build that creates kerberos and kafka kerberised instances, this may help
you getting started if all you're looking for is testing kafka with
kerberos.

https://github.com/gerritjvv/kafka-fast/blob/kerberos/
kafka-clj/doc/vagrant.md

https://github.com/gerritjvv/kafka-fast/blob/kerberos/kafka-clj/Vagrantfile
See https://github.com/gerritjvv/kafka-fast/blob/kerberos/
kafka-clj/vagrant/scripts/kerberos.sh (installs kerberos and adds
principles)

note: still under development :)


On Fri, Dec 30, 2016 at 5:19 PM, Raghav  wrote:

> Hi
>
> I have never dabbled Kafka with security settings. I was trying to follow
> this blog to get it working:
> http://kafka.apache.org/documentation.html#security_sasl
>
> But I can't seem to add principals for Kafka in Kerberos server, and I am
> getting the following error:
>
> dp@kdc:~$ sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/
> kaf...@example.com'
> [sudo] password for dp:
> Authenticating as principal root/ad...@example.com with password.
> WARNING: no policy specified for kafka/kaf...@example.com; defaulting to
> no
> policy
> add_principal: No such entry in the database while creating "kafka/
> kaf...@example.com".
> dp@kdc:~$ ^C
>
> Can anyone share more insight into how they got a simple Kerberos server to
> work with Kafka, and able to send one message end to end. I greatly
> appreciate your help.
>
> Many thanks.
>
> --
> Raghav
>


Re: rebalancing - how to speed it up?

2016-12-11 Thread Gerrit Jansen van Vuuren
I don't know about speeding up rebalancing, and an hour seems to suggest
something is wrong with zookeeper or you're whole setup maybe. if it
becomes an unsolvable issue for you, you could try
https://github.com/gerritjvv/kafka-fast which uses a different model and
doesn't need balancing or rebalancing.

disclojure: "Im the library author".



On 11 Dec 2016 11:56 a.m., "Jon Yeargers"  wrote:

Is there some way to 'help it along'? It's taking an hour or more from when
I start my app to actually seeing anything consumed.

Plenty of CPU (and IOWait) during this time so I know it's doing
_something_...


Re: Question regarding dynamic subscriber environment

2016-11-02 Thread Gerrit Jansen van Vuuren
yes. You open a connection, and the fetch threads will look at a shared
variable for the topics to fetch, its this shared variable that is updated
when you add and remove topics. The connection itself is not closed. There
is no relation between a connection and the topics being consumed, other
than the connection is used to fetch data for a particular topic, for each
broker used a pool of connections is kept, and when a topic is fetched at
any point a connection for the required broker is selected from the pool,
then returned to the pool after use.

See: https://github.com/gerritjvv/kafka-fast#java-1

Consumer consumer = Consumer.connect(new KafkaConf(), new
BrokerConf[]{new BrokerConf("192.168.4.40", 9092)}, new
RedisConf("192.168.4.10", 6379, "test-group"), "my-topic");Message msg
= consumer.readMsg();

//Add topics
consumer.addTopics("topic1", "topic2");
//Remove topics
consumer.removeTopics("topic1", "topic2");



On Wed, Nov 2, 2016 at 4:37 PM, Janagan Sivagnanasundaram <
janagan1...@gmail.com> wrote:

> Does this really address the respective problem? Ultimate task is that, the
> connection between broker and subscriber should not be terminated.
> Subscriber is free to change his topic interests without closing the
> connection.
>
> On Wed, Nov 2, 2016 at 12:43 PM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > Have a look at the kafka client lib
> > https://github.com/gerritjvv/kafka-fast#java-1, it already provides this
> > functionality.
> >
> >
> > On Wed, Nov 2, 2016 at 2:34 AM, Janagan Sivagnanasundaram <
> > janagan1...@gmail.com> wrote:
> >
> > > Kafka's current nature is does not support to dynamic subscriber
> > > environment where the topic interest of the subscriber is keep change
> > > overtime unless subscriber terminate the particular connection and
> > > reconnect.
> > >
> > > What I have planned to implement or design is to enable the dynamic
> > > subscriber environment where the subscriber can change his topic
> > interests
> > > without terminating the connection with it's broker. Following are some
> > > ideas that we can use to do such thing.
> > >
> > > 1) Modifying the current protocol by adding an extra attribute
> > > 2) Create a control connection channel from subscriber to broker
> > >
> > > Is there easy/efficient way to do such thing?
> > >
> >
>


Re: Question regarding dynamic subscriber environment

2016-11-02 Thread Gerrit Jansen van Vuuren
Hi,

Have a look at the kafka client lib
https://github.com/gerritjvv/kafka-fast#java-1, it already provides this
functionality.


On Wed, Nov 2, 2016 at 2:34 AM, Janagan Sivagnanasundaram <
janagan1...@gmail.com> wrote:

> Kafka's current nature is does not support to dynamic subscriber
> environment where the topic interest of the subscriber is keep change
> overtime unless subscriber terminate the particular connection and
> reconnect.
>
> What I have planned to implement or design is to enable the dynamic
> subscriber environment where the subscriber can change his topic interests
> without terminating the connection with it's broker. Following are some
> ideas that we can use to do such thing.
>
> 1) Modifying the current protocol by adding an extra attribute
> 2) Create a control connection channel from subscriber to broker
>
> Is there easy/efficient way to do such thing?
>


Re: Topic with many paritions and many consumers, recommendations wanted.

2016-09-30 Thread Gerrit Jansen van Vuuren
take a look at kafka client https://github.com/gerritjvv/kafka-fast, it
uses a different approach where you can have more than several consumers
per topic+partition (i.e no relation between topic partitions and
consumers). It uses redis but only for offsets and work distribution, not
for the messages itself.

On Fri, Sep 30, 2016 at 4:07 PM, craig w  wrote:

> I have a scenario where, for a given topic I'll have 500 consumers (1
> consumer per instance of an app). I've setup the topic so it has 500
> partitions, thus ensuring each consumer will eventually get work (the data
> produced into kafka  use the default partitioning strategy).
>
> note: These consumer app instances are run in containers via Marathon
> (using Mesos).
>
> Several times a day the consumer apps can be intentionally restarted (to
> upgrade the app, etc). When a rolling restart occurs, Kafka begins its
> rebalancing process. This process can take 10 minutes or so as the rolling
> restart itself takes a few minutes. As a result, what I've seen is that a
> consumer will have its partitions reassigned, consume a new message, start
> working on it, and then a reassignment occurs again. The work being
> performed when a message is received is effectively lost since messages
> being processed take 30s - 2 hours to process, and a re-assignment occurs.
>
> One suggestion from someone was to create a separate "app" in marathon for
> each instance, therefore I'd have 500 apps in marathon, and assign each one
> a specific partition number instead of letting Kafka assign partitions
> automatically to the consumers. This is problematic because I need to be
> able to increase/decrease the number of instances of the app based on
> demand coming into the system.
>
> To work around this, we have a custom component that consumes kafka topics
> and puts messages into redis lists (one per kafka topic). Then our
> consumers are doing a BLPOP (blocking pop operation) to ensure the message
> is only processed once, but also helps avoid rebalancing in kafka when the
> consumer apps are restarted.
>
> I'm considering using a different queueing system such as ActiveMQ,
> RabbitMQ...to avoid this kafka to redis scenario. Is kafka the right fit?
> Is there a better approach to doing this with kafka?
>
> Thanks in advance,
> Craig
>


Re: G1 tuning

2015-10-14 Thread Gerrit Jansen van Vuuren
Hi,

I've seen pauses using G1 in other applications and have found that
-XX:+UseParallelGC
-XX:+UseParallelOldGC  works best if you're having GC issues in general on
the JVM.


Regards,
 Gerrit

On Wed, Oct 14, 2015 at 4:28 PM, Cory Kolbeck  wrote:

> Hi folks,
>
> I'm a bit new to the operational side of G1, but pretty familiar with its
> basic concept. We recently set up a Kafka cluster to support a new product,
> and are seeing some suboptimal GC performance. We're using the parameters
> suggested in the docs, except for having switched to java 1.8_40 in order
> to get better memory debugging. Even though the cluster is handling only
> 2-3k messages per second per node, we see periodic 11-18 second
> stop-the-world pauses on a roughly hourly cadence. I've turned on
> additional GC logging, and see no humongous allocations, it all seems to be
> buffers making it into the tenured gen. They appear to be collectable, as
> the collection triggered by dumping the heap collects them all. Ideas for
> additional diagnosis or tuning very welcome.
>
> --Cory
>


Re: producer hung after some time

2015-09-29 Thread Gerrit Jansen van Vuuren
also check your'e GC, caused by using async without backpressuce, with the
latest jdk and GC1 I've found that many time's a JVM app can become
unresponsive without throwing a OOM.

running jstat -gcutil  250 would tell you, e.g if S0 or S1 stay at
100.00 then you've got a GC problem, and need to revert back to using CMS.


Re: automatically consume from all topics

2015-09-10 Thread Gerrit Jansen van Vuuren
Hi,

I'm not sure about the high level consumer but I maintain a kafka consumer
that can add and remove topics dynamically.

https://github.com/gerritjvv/kafka-fast
see
https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/java/kakfa_clj/core/Consumer.java
if you're using java/scala



On Thu, Sep 10, 2015 at 3:48 PM, Helleren, Erik 
wrote:

> So, there are several ways to do this.  Lets assume the goal is to add
> more topics to the application at runtime.  And that this app is currently
> written to be distributed via the magic of consumer groups.  Sadly, I
> don¹t think the High level consumer is well designed for this particular
> use case.  The app would have to poll using something like the topic list
> script (bin/kafka-topics.sh --list Š), close the existing high level
> consumer on a change, and start a new one.  And then do this on all the
> nodes of your application (Should be easier than doing it on just one
> actually).  This would result in a huge latency spike and a problem when
> it comes to migrating the state involved in your example expectation.
>
> The next option still requires polling, but it needs a custom FT and
> distribution scheme.  There might need a leader so some things only happen
> once.  Just use the simple consumer API, and have one thread per
> partition.  The leader would have to tell a follower something like ³Start
> listening to topic X, partition Y², which is risky and difficult to do.
>
> The simplest option, assuming that each topic is independent when it comes
> to expectations, is don¹t go with a cluster.  Just have a script/watcher
> app that does the polling and then, when it detects new topics, for each
> new topic: start a new instance of your app on a new box that listens to
> that single topic.  It might take a few seconds to startup, but its easy
> to code, easy to maintain, and easy to understand.  Which makes for a more
> resilient application.
> -Erik
>
>
>
> From:  Joris Peeters 
> Reply-To:  "users@kafka.apache.org" 
> Date:  Thursday, September 10, 2015 at 6:09 AM
> To:  "users@kafka.apache.org" 
> Subject:  automatically consume from all topics
>
>
> Hello,
>
> Is there a simple way to set up a consumer that automatically picks up all
> the topics for all the partitions, dynamically extending its range as new
> topics get created?
>
> The underlying idea is that we want to have a few over-arching consumers
> (I¹m aware that¹s not great for the scalability, but that¹s not such a
> concern at present), to
> -
> Gather various statistics, metrics, system pressure, Š and dispatch to the
> appropriate  monitoring systems,
> -
> Apply some end-to-end business-logic testing, to continuously assert
> certain expectations (e.g. ³if this-sort-of message arrived, then we
> expect that-sort-of-message to be received within this time² etc).
>
>
> I¹m sure I can piece something together that does this, but perhaps it
> comes out of the box. (Couldn¹t find it, though).
> We¹re using the Java client and Kafka 8.2.1.
>
> Joris Peeters
> Developer
>
> Research and Data Technology
> T:
> +44 (0) 20 8576 5800
>
> Winton
> Grove House
> 27 Hammersmith Grove
> London W6 0NE
>
> wintoncapital.com 
>
>  
>
>
>
>
>
> Winton Capital Management Limited (³Winton²) is a limited company
> registered in England and Wales with its registered offices at 16 Old
> Bailey, London, EC4M 7EG (Registered Company No. 3311531).
>  Winton is authorised and regulated by the Financial Conduct Authority in
> the United Kingdom, registered as an investment adviser with the US
> Securities and Exchange Commission, registered with the US Commodity
> Futures Trading Commission and a member of the
>  National Futures Association in the United States.
> This communication, including any attachments, is confidential and may be
> privileged. This email is for use by the intended recipient only. If you
> receive it in error, please notify the sender and
>  delete it. You should not copy or disclose all or any part of this email.
> This email does not constitute an offer or solicitation and nothing
> contained in this email constitutes, and should not be construed as,
> investment advice. Prospective investors should request offering
>  materials and consult their own advisers with respect to investment
> decisions and inform themselves as to applicable legal requirements,
> exchange control regulations and taxes in the countries of their
> citizenship, residence or domicile. Past performance is
>  not indicative of future results.
> Winton takes reasonable steps to ensure the accuracy and integrity of its
> communications, including emails. However Winton accepts no liability for
> any materials transmitted. Emails are not secure
>  and cannot be guaranteed to be error free.
>
>


Re: kafka java api (written in 100% clojure)

2014-10-13 Thread Gerrit Jansen van Vuuren
Hi Gwen,

For other reasons :), I've used zookeeper (from Java) in the past to store
similar information and have found it doesnt work good enough for this use
case in particular. Zk is great for config and locks but not for data that
changes fast and can grow beyond a certain limit. In particular my worst
experience with zk has been sync data becoming too big to flush to disks
and snapshots taking too long if ever to be read by quorum nodes after a
leader election, sometimes the only real option was to delete all data from
disk and start a new fresh zookeeper cluster.

I've also moved away from the consumer = topic/partition constraint by
using a list/queue in redis (agreeably something I can also do in zk) but
redis again just feels like a better fit for it, especially with things
like brpoppush.


On Tue, Oct 14, 2014 at 12:09 AM, Gwen Shapira 
wrote:

> Out of curiosity: did you choose Redis because ZooKeeper is not well
> supported in Clojure? Or were there other reasons?
>
> On Mon, Oct 13, 2014 at 2:04 PM, Gerrit Jansen van Vuuren
>  wrote:
> > Hi Steven,
> >
> > Redis:
> >
> >   I've had a discussion on redis today, and one architecture that does
> come
> > up is using a master slave, then if the master fails the have the
> > application start writing to the slave. Writing to a slave is possible in
> > redis, albeit you cannot fail back to the master because writes to a
> slave
> > will not be automatically replicated to the master.
> >
> >   Any suggestions are welcome.
> >
> > Java:
> >
> >   I like Java, have been with it a long time, did groovy, went back to
> > Java, tried scala, went back to Java, then tried clojure and got sold on
> it
> > (mainly because it fits my way of thinking). OtherJVMLang -> Java interop
> > is always better than Java -> OtherJVMLang interop. Clojure interop to
> Java
> > is really great, but Java to Clojure you need to use things like:
> >   RT.var("kafka-clj.consumer.node", "read-msg!").invoke(connector,
> > timeoutMillis))
> >
> >
> >  I've refactored the Java API to be more "Java like" (plus made Consumer
> > Iterable), and made a new release "2.3.9", see the updated examples, also
> > have a look at
> >
> https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/doc/vagrant.md
> > .
> >
> > My idea for this library is that from Java/Groovy etc you do not need to
> > know about Clojure behind the scenes (barring the stacktraces), you just
> > get Java, and obviously if your using Clojure you just get Clojure ;).
> >
> > Cheers,
> >  Gerrit
> >
> >
> > On Mon, Oct 13, 2014 at 6:52 PM, Steven Schlansker <
> > sschlans...@opentable.com> wrote:
> >
> >> Couple of mostly-uninformed comments inline,
> >>
> >>
> >> On Oct 13, 2014, at 2:00 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com>
> >> wrote:
> >>
> >> > Hi Daniel,
> >> >
> >> > At the moment redis is a spof in the architecture, but you can setup
> >> > replication and I'm seriously looking into using redis cluster to
> >> eliminate
> >> > this.
> >> >   Some docs that point to this are:
> >> >   http://redis.io/topics/cluster-tutorial
> >> >   http://redis.io/topics/sentinel
> >>
> >> There's some evidence that redis clusters are *not* good for managing
> state
> >> in the way that you are using it:
> >>
> >> http://aphyr.com/posts/283-call-me-maybe-redis
> >>
> >> > If you can’t tolerate data loss, Redis Sentinel (and by extension
> Redis
> >> Cluster) is not safe for use as:
> >> >
> >> >   • A lock service
> >> >   • A queue
> >> >   • A database
> >>
> >>
> >> >>
> >> >>> On 13/10/2014, at 10:22 am, Gerrit Jansen van Vuuren <
> >> >> gerrit...@gmail.com> wrote:
> >> >>>
> >> >>> Hi,
> >> >>>
> >> >>> Just thought I'll put this out for the kafka community to see (if
> >> anyone
> >> >>> finds it useful great!!).
> >> >>>
> >> >>> Kafka-fast is 100% pure clojure implementation for kafka, but not
> just
> >> >>> meant for clojure because it has a Java API wrapper that can be used
> >> from
> >> >>> Java, Groovy, JRuby or Scala.
> >>
> >> One thing that frustrates me with the K

Re: kafka java api (written in 100% clojure)

2014-10-13 Thread Gerrit Jansen van Vuuren
Hi Steven,

Redis:

  I've had a discussion on redis today, and one architecture that does come
up is using a master slave, then if the master fails the have the
application start writing to the slave. Writing to a slave is possible in
redis, albeit you cannot fail back to the master because writes to a slave
will not be automatically replicated to the master.

  Any suggestions are welcome.

Java:

  I like Java, have been with it a long time, did groovy, went back to
Java, tried scala, went back to Java, then tried clojure and got sold on it
(mainly because it fits my way of thinking). OtherJVMLang -> Java interop
is always better than Java -> OtherJVMLang interop. Clojure interop to Java
is really great, but Java to Clojure you need to use things like:
  RT.var("kafka-clj.consumer.node", "read-msg!").invoke(connector,
timeoutMillis))


 I've refactored the Java API to be more "Java like" (plus made Consumer
Iterable), and made a new release "2.3.9", see the updated examples, also
have a look at
https://github.com/gerritjvv/kafka-fast/blob/master/kafka-clj/doc/vagrant.md
.

My idea for this library is that from Java/Groovy etc you do not need to
know about Clojure behind the scenes (barring the stacktraces), you just
get Java, and obviously if your using Clojure you just get Clojure ;).

Cheers,
 Gerrit


On Mon, Oct 13, 2014 at 6:52 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:

> Couple of mostly-uninformed comments inline,
>
>
> On Oct 13, 2014, at 2:00 AM, Gerrit Jansen van Vuuren 
> wrote:
>
> > Hi Daniel,
> >
> > At the moment redis is a spof in the architecture, but you can setup
> > replication and I'm seriously looking into using redis cluster to
> eliminate
> > this.
> >   Some docs that point to this are:
> >   http://redis.io/topics/cluster-tutorial
> >   http://redis.io/topics/sentinel
>
> There's some evidence that redis clusters are *not* good for managing state
> in the way that you are using it:
>
> http://aphyr.com/posts/283-call-me-maybe-redis
>
> > If you can’t tolerate data loss, Redis Sentinel (and by extension Redis
> Cluster) is not safe for use as:
> >
> >   • A lock service
> >   • A queue
> >   • A database
>
>
> >>
> >>> On 13/10/2014, at 10:22 am, Gerrit Jansen van Vuuren <
> >> gerrit...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> Just thought I'll put this out for the kafka community to see (if
> anyone
> >>> finds it useful great!!).
> >>>
> >>> Kafka-fast is 100% pure clojure implementation for kafka, but not just
> >>> meant for clojure because it has a Java API wrapper that can be used
> from
> >>> Java, Groovy, JRuby or Scala.
>
> One thing that frustrates me with the Kafka library is that despite it
> claiming
> that the Scala code is interoperable with Java, it really isn't.  You end
> up
> having to work around the Scala compiler 'magic' in increasingly bizarre
> ways,
> e.g. default arguments:
>
> kafka = new KafkaServer(createConfig(), KafkaServer.init$default$2());
>
> which is both magical and fragile.  I don't know whether Clojure is the
> same way,
> just want to point out that if you don't take particular care of us old
> fart Java
> nuts, you'll lose us quickly :)
>
> Another example, from your docs:
>
> Object connector = Producer.createConnector(new BrokerConf("192.168.4.40",
> 9092));
> Producer.sendMsg(connector, "my-topic", "Hi".getBytes("UTF-8"));
>
> This is downright bizarre to me, I would instead expect:
>
> Producer connector = Producer.createConnector(...)
> connector.sendMsg("my-topic", bytes)
>
> which is IMO shorter, cleaner, and easier for testing (especially mocking).
>
>
> Hope some of my ravings are helpful,
> Steven
>
>


Re: kafka java api (written in 100% clojure)

2014-10-13 Thread Gerrit Jansen van Vuuren
Hi Daniel,

At the moment redis is a spof in the architecture, but you can setup
replication and I'm seriously looking into using redis cluster to eliminate
this.
   Some docs that point to this are:
   http://redis.io/topics/cluster-tutorial
   http://redis.io/topics/sentinel


Consumer:

Consumption is plit into logical work units by default 10K offsets.

If redis fails, the messages currently read will all be consumed, while the
redis connection threads go mad trying to reconnect.
No data loss should occur (albeit I'm still setting up scenarios in which I
will test this).
The consumer threads read work assignments from a redis list
using brpoplpush (see http://redis.io/commands/BRPOPLPUSH) and with this
pushes
the current working unit into a "working" queue, if any errors are
encountered while consuming the current work unit will not be marked done,
on recovery (startup)
these "working" queues are scanned and any work units found are placed back
onto the primary queue ready for consumption.

If an error message from kafka is received e.g error-code 6, the consumer
will try to recreate the metadata for a work-unit, while this is done the
work-unit will reside on
a "working" queue.

Another idea I'm going to play with is disabling consumption per
machine/topic to allow a machine to completely drain its messages in a
normal operational way,
so that safe shutdown can be scheduled for any machine without having to
rely on multiple threads synching etc.

Producer:

All sends are async and a broker is randomly selected.
The producer has a retry cache (implemented using http://www.mapdb.org/),
if any failure during sending (network or connection errors)  the message
is saved and retried.
If acks==1 and a failure response from kafka is received, the message is
retried N times.

The low-level producer api allows you to by-pass all this logic and send to
any producer you want, the message send is still async but you can
wait/block on a response from the server
and then react as required.

Auditing:

There is an event channel (clojure.core.async) that receives work-units
consumed with a status field for errors.
This can be used to auditing consumers, I normally send this data to a
riemann instance, or just to disk.


Redis is fast and stable and I've found that even running it on a mixed
service box (e.g along side mysql etc) works fine, but this doesn't mean
that the physical hardware underneath can't fail.
In my case with 2 years of production use of redis I've only had one outage
(due to hardware failure).


Hope this answers some of your questions.
Any ideas of improving this is welcome and feel free to contribute and/or
create issue tickets with possible features ;)

Regards,


On Mon, Oct 13, 2014 at 9:58 AM, Daniel Compton 
wrote:

> Hi Gerrit
>
> Thanks for your contribution, I'm sure everyone here appreciates it,
> especially Clojure developers like myself. I do have one question: what are
> the guarantees you offer to users of your library under failures,
> particularly when Redis fails?
>
> --
> Daniel
>
> > On 13/10/2014, at 10:22 am, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
> >
> > Hi,
> >
> > Just thought I'll put this out for the kafka community to see (if anyone
> > finds it useful great!!).
> >
> > Kafka-fast is 100% pure clojure implementation for kafka, but not just
> > meant for clojure because it has a Java API wrapper that can be used from
> > Java, Groovy, JRuby or Scala.
> >
> > This library does not wrap scala instead it directly communicates with
> the
> > kafka cluster.
> >
> > It also uses redis for offsets instead of zookeeper and removes the one
> > consumer per partition limit that all other kafka libraries have, by
> diving
> > offsets into logical work units and running consumption via a queue(list)
> > in redis.
> >
> > https://github.com/gerritjvv/kafka-fast
> >
> > Any constructive critique is more than welcome :)
>


kafka java api (written in 100% clojure)

2014-10-12 Thread Gerrit Jansen van Vuuren
Hi,

Just thought I'll put this out for the kafka community to see (if anyone
finds it useful great!!).

Kafka-fast is 100% pure clojure implementation for kafka, but not just
meant for clojure because it has a Java API wrapper that can be used from
Java, Groovy, JRuby or Scala.

This library does not wrap scala instead it directly communicates with the
kafka cluster.

It also uses redis for offsets instead of zookeeper and removes the one
consumer per partition limit that all other kafka libraries have, by diving
offsets into logical work units and running consumption via a queue(list)
in redis.

https://github.com/gerritjvv/kafka-fast

Any constructive critique is more than welcome :)


Re: cannot replicate topics kafka inconsistent state

2014-06-17 Thread Gerrit Jansen van Vuuren
The network is 10gig and so far has not given any issues I think its
extremely unlikely that it could be network (all ports are open and all
communication happens on an internal lan).

I'm running consumers and producers on the nodes where the brokers are
running and they are consuming and producing data at high volumes between
the nodes.
While doing the test I was not running any producers or consumers other
than the test kafka-console-producer and kafka-console-consumer.




On Tue, Jun 17, 2014 at 4:28 PM, Jun Rao  wrote:

> Is your network stable?
>
> Thanks,
>
> Jun
>
>
> On Tue, Jun 17, 2014 at 1:48 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > I've installed kafka 2.8.1,
> > created a topic using:
> >
> > /opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
> > "localhost:2381" --partitions 2 --replication-factor 2
> >
> > Then opened a console producer and a console consumer.
> > I type a few lines on the producer and then the two kafka brokers that
> > should have the two replicas start throwing errors to the logs, the only
> > way to get kafka back to normal again is by deleting all of the topic
> data
> > in kafka and in zookeeper and restarting.
> >
> > The errors are:
> > broker1:
> >
> > 2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
> > kafka.network.Processor - Closing socket for /10.101.4.218 because of
> > error^C
> >
> > kafka.common.KafkaException: This operation cannot be completed on a
> > complete request.
> >
> > at
> kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
> >
> > at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
> >
> > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
> >
> > at kafka.network.Processor.write(SocketServer.scala:375)
> >
> > at kafka.network.Processor.run(SocketServer.scala:247)
> >
> > at java.lang.Thread.run(Thread.java:744)
> >
> >
> > broker2
> >
> > 2014-06-17/01:40:29.127/PDT WARN  [ReplicaFetcherThread-0-215]:
> > kafka.consumer.SimpleConsumer - Reconnect due to socket error: null
> >
> > 2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
> > kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error
> in
> > fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
> > ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
> > bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)
> >
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> >
> > at kafka.utils.Utils$.read(Utils.scala:376)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFe

cannot replicate topics kafka inconsistent state

2014-06-17 Thread Gerrit Jansen van Vuuren
Hi,

I've installed kafka 2.8.1,
created a topic using:

/opt/kafka/bin/kafka-topics.sh --create --topic "test" --zookeeper
"localhost:2381" --partitions 2 --replication-factor 2

Then opened a console producer and a console consumer.
I type a few lines on the producer and then the two kafka brokers that
should have the two replicas start throwing errors to the logs, the only
way to get kafka back to normal again is by deleting all of the topic data
in kafka and in zookeeper and restarting.

The errors are:
broker1:

2014-06-17/01:40:32.137/PDT ERROR [kafka-processor-9092-5]:
kafka.network.Processor - Closing socket for /10.101.4.218 because of
error^C

kafka.common.KafkaException: This operation cannot be completed on a
complete request.

at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)

at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)

at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)

at kafka.network.Processor.write(SocketServer.scala:375)

at kafka.network.Processor.run(SocketServer.scala:247)

at java.lang.Thread.run(Thread.java:744)


broker2

2014-06-17/01:40:29.127/PDT WARN  [ReplicaFetcherThread-0-215]:
kafka.consumer.SimpleConsumer - Reconnect due to socket error: null

2014-06-17/01:40:29.127/PDT ERROR [ReplicaFetcherThread-0-215]:
kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-0-215], Error in
fetch Name: FetchRequest; Version: 0; CorrelationId: 545271; ClientId:
ReplicaFetcherThread-0-215; ReplicaId: 218; MaxWait: 1000 ms; MinBytes: 1
bytes; RequestInfo: [test,1] -> PartitionFetchInfo(1,2147483647)

java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.

at kafka.utils.Utils$.read(Utils.scala:376)

at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Receive$class.readCompletely(Transmission.scala:56)

at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)

at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)

at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)

at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)

at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)

 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


The same error on each is repeated in an endless loop.


My config server config is:



num.network.threads=24

num.io.threads=24

socket.send.buffer.bytes=10485760

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=524288000

replica.lag.max.messages=500

replica.fetch.max.bytes=2097152

replica.fetch.wait.max.ms=1000

log.dir=/data

num.partitions=12

log.flush.interval.messages=20

log.flush.interval.ms=2000

log.retention.hours=168

log.retention.mins=10080

log.retention.hours=168

log.retention.mins=10080

log.retention.bytes=219902322

replica.fetch.max.bytes=2147483647

log.segment.bytes=209715200

log.cleanup.interval.mins=10

default.replication.factor=2

zookeeper.connect=localhost:2381

zookeeper.connection.timeout.ms=100

--


Am I missing some configuration properties?


Regards,

 Gerrit


Re: kafka.common.LeaderNotAvailableException

2014-01-29 Thread Gerrit Jansen van Vuuren
I've found the response to my own question:
http://mail-archives.apache.org/mod_mbox/kafka-users/201308.mbox/%3c44d1e1522419a14482f89ff4ce322ede25025...@brn1wnexmbx01.vcorp.ad.vrsn.com%3E



On Wed, Jan 29, 2014 at 1:17 PM, Gerrit Jansen van Vuuren <
gerrit...@gmail.com> wrote:

> Hi,
>
> I'm testing kafka 0.8.0 failover.
>
> I have 5 brokers 1,2,3,4,5. I shutdown 5 (with controlled shutdown
> activated).
> broker 4 is my bootstrap broker.
>
> My config has: default.replication.factor=2, num.partitions=8.
>
> When I look at the kafka server.log on broker 4 I get the below error,
> which only goes away when I restart broker 5.
>
>
> [2014-01-29 04:12:15,348] ERROR [KafkaApi-4] Error while fetching metadata
> for partition [data,4] (kafka.server.KafkaApis)
> kafka.common.LeaderNotAvailableException: Leader not available for
> partition [data,4]
>  at
> kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:468)
> at
> kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:456)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>  at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> at scala.collection.immutable.List.map(List.scala:45)
>  at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:456)
> at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:452)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>  at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
>  at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>  at scala.collection.immutable.HashSet.map(HashSet.scala:32)
> at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:452)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>  at java.lang.Thread.run(Thread.java:724)
>
>
> any ideas?
>


Re: custom kafka consumer - strangeness

2014-01-29 Thread Gerrit Jansen van Vuuren
Hi,

I've finally fixed this by closing the connection on timeout and creating a
new connection on the next send.

Thanks,
 Gerrit


On Tue, Jan 14, 2014 at 10:20 AM, Gerrit Jansen van Vuuren <
gerrit...@gmail.com> wrote:

> Hi,
>
> thanks I will do this.
>
>
>
> On Tue, Jan 14, 2014 at 9:51 AM, Joe Stein  wrote:
>
>> I Gerrit, do you have a ticket already for this issue?  Is it possible to
>> attach code that reproduces it?  Would be great if you can run it against
>> a
>> Kafka VM you can grab one from this project for 0.8.0
>> https://github.com/stealthly/scala-kafka to launch a Kafka VM and add
>> whatever you need to it to reproduce the issue or from
>> https://issues.apache.org/jira/browse/KAFKA-1173 for 0.8.1.  I think if
>> you
>> can reproduce it in an environment comfortably that is in a controlled
>> isolation that would be helpful for folks to reproduce and work towards
>> resolution At least if it is a bug we can get a detailed capture of
>> what the bug is in the JIRA ticket and start discussing how to fix it.
>>
>> /***
>>  Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> /
>>
>>
>> On Tue, Jan 14, 2014 at 3:38 AM, Gerrit Jansen van Vuuren <
>> gerrit...@gmail.com> wrote:
>>
>> > Yes, I'm using my own client following:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>> >
>> > Everything works except for this weirdness.
>> >
>> >
>> > On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao  wrote:
>> >
>> > > So, you implemented your own consumer client using netty?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
>> > > gerrit...@gmail.com> wrote:
>> > >
>> > > > I'm using netty and async write, read.
>> > > > For read I used a timeout such that if I do not see anything on the
>> > read
>> > > > channel, my read function times out and returns null.
>> > > > I do not see any error on the socket, and the same socket is used
>> > > > throughout all of the fetches.
>> > > >
>> > > > I'm using the console producer and messages are "11", "22", "abc",
>> > ""
>> > > > etc.
>> > > >
>> > > > I can reliably reproduce it every time.
>> > > >
>> > > > Its weird yes, no compression is used, the timeout happens for the
>> same
>> > > > scenario every time.
>> > > >
>> > > >
>> > > >
>> > > > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao  wrote:
>> > > >
>> > > > > I can't seen to find the log trace for the timed out fetch request
>> > > (every
>> > > > > fetch request seems to have a corresponding completed entry). For
>> the
>> > > > timed
>> > > > > out fetch request, is it that the broker never completed the
>> request
>> > or
>> > > > is
>> > > > > it that it just took longer than the socket timeout to finish
>> > > processing
>> > > > > the request? Do you use large messages in your test?
>> > > > >
>> > > > > If you haven't enabled compression, it's weird that you will
>> re-get
>> > 240
>> > > > and
>> > > > > 241 with an offset of 242 in the fetch request. Is that easily
>> > > > > reproducible?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > >
>> > > > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
>> > > > > gerrit...@gmail.com> wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > >
>> > > > > > the offset in g is 240, and in i 242, the last message read was
>> at
>> > > > offset
>> > > > > > 239.
>> > > > > >
>> > >

kafka.common.LeaderNotAvailableException

2014-01-29 Thread Gerrit Jansen van Vuuren
Hi,

I'm testing kafka 0.8.0 failover.

I have 5 brokers 1,2,3,4,5. I shutdown 5 (with controlled shutdown
activated).
broker 4 is my bootstrap broker.

My config has: default.replication.factor=2, num.partitions=8.

When I look at the kafka server.log on broker 4 I get the below error,
which only goes away when I restart broker 5.


[2014-01-29 04:12:15,348] ERROR [KafkaApi-4] Error while fetching metadata
for partition [data,4] (kafka.server.KafkaApis)
kafka.common.LeaderNotAvailableException: Leader not available for
partition [data,4]
at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:468)
at kafka.server.KafkaApis$$anonfun$17$$anonfun$20.apply(KafkaApis.scala:456)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.immutable.List.map(List.scala:45)
at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:456)
at kafka.server.KafkaApis$$anonfun$17.apply(KafkaApis.scala:452)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.immutable.HashSet.map(HashSet.scala:32)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:452)
at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:724)


any ideas?


Re: custom kafka consumer - strangeness

2014-01-14 Thread Gerrit Jansen van Vuuren
Hi,

thanks I will do this.



On Tue, Jan 14, 2014 at 9:51 AM, Joe Stein  wrote:

> I Gerrit, do you have a ticket already for this issue?  Is it possible to
> attach code that reproduces it?  Would be great if you can run it against a
> Kafka VM you can grab one from this project for 0.8.0
> https://github.com/stealthly/scala-kafka to launch a Kafka VM and add
> whatever you need to it to reproduce the issue or from
> https://issues.apache.org/jira/browse/KAFKA-1173 for 0.8.1.  I think if
> you
> can reproduce it in an environment comfortably that is in a controlled
> isolation that would be helpful for folks to reproduce and work towards
> resolution At least if it is a bug we can get a detailed capture of
> what the bug is in the JIRA ticket and start discussing how to fix it.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ************/
>
>
> On Tue, Jan 14, 2014 at 3:38 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Yes, I'm using my own client following:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >
> > Everything works except for this weirdness.
> >
> >
> > On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao  wrote:
> >
> > > So, you implemented your own consumer client using netty?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
> > > gerrit...@gmail.com> wrote:
> > >
> > > > I'm using netty and async write, read.
> > > > For read I used a timeout such that if I do not see anything on the
> > read
> > > > channel, my read function times out and returns null.
> > > > I do not see any error on the socket, and the same socket is used
> > > > throughout all of the fetches.
> > > >
> > > > I'm using the console producer and messages are "11", "22", "abc",
> > ""
> > > > etc.
> > > >
> > > > I can reliably reproduce it every time.
> > > >
> > > > Its weird yes, no compression is used, the timeout happens for the
> same
> > > > scenario every time.
> > > >
> > > >
> > > >
> > > > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao  wrote:
> > > >
> > > > > I can't seen to find the log trace for the timed out fetch request
> > > (every
> > > > > fetch request seems to have a corresponding completed entry). For
> the
> > > > timed
> > > > > out fetch request, is it that the broker never completed the
> request
> > or
> > > > is
> > > > > it that it just took longer than the socket timeout to finish
> > > processing
> > > > > the request? Do you use large messages in your test?
> > > > >
> > > > > If you haven't enabled compression, it's weird that you will re-get
> > 240
> > > > and
> > > > > 241 with an offset of 242 in the fetch request. Is that easily
> > > > > reproducible?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
> > > > > gerrit...@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > the offset in g is 240, and in i 242, the last message read was
> at
> > > > offset
> > > > > > 239.
> > > > > >
> > > > > > After reading from 0 - 239, I make another request for 240, this
> > > > request
> > > > > > timesout and never returns.
> > > > > > I then manually add 2 entries via the console producer, all the
> > time
> > > > > while
> > > > > > making a request for 240 every 10 seconds, all subsequent
> requests
> > > for
> > > > > > offset 240 returns empty messages, till the responses are
> written.
> > > > Then I
> > > > > > get the 2 messages at offsets 240,241 and an end of response.
> Then
> > I
> > >

Re: custom kafka consumer - strangeness

2014-01-14 Thread Gerrit Jansen van Vuuren
Yes, I'm using my own client following:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Everything works except for this weirdness.


On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao  wrote:

> So, you implemented your own consumer client using netty?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > I'm using netty and async write, read.
> > For read I used a timeout such that if I do not see anything on the read
> > channel, my read function times out and returns null.
> > I do not see any error on the socket, and the same socket is used
> > throughout all of the fetches.
> >
> > I'm using the console producer and messages are "11", "22", "abc", ""
> > etc.
> >
> > I can reliably reproduce it every time.
> >
> > Its weird yes, no compression is used, the timeout happens for the same
> > scenario every time.
> >
> >
> >
> > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao  wrote:
> >
> > > I can't seen to find the log trace for the timed out fetch request
> (every
> > > fetch request seems to have a corresponding completed entry). For the
> > timed
> > > out fetch request, is it that the broker never completed the request or
> > is
> > > it that it just took longer than the socket timeout to finish
> processing
> > > the request? Do you use large messages in your test?
> > >
> > > If you haven't enabled compression, it's weird that you will re-get 240
> > and
> > > 241 with an offset of 242 in the fetch request. Is that easily
> > > reproducible?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
> > > gerrit...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > the offset in g is 240, and in i 242, the last message read was at
> > offset
> > > > 239.
> > > >
> > > > After reading from 0 - 239, I make another request for 240, this
> > request
> > > > timesout and never returns.
> > > > I then manually add 2 entries via the console producer, all the time
> > > while
> > > > making a request for 240 every 10 seconds, all subsequent requests
> for
> > > > offset 240 returns empty messages, till the responses are written.
> > Then I
> > > > get the 2 messages at offsets 240,241 and an end of response. Then I
> > > make a
> > > > request for offset 242, and get the messages at offsets 240,241
> again.
> > > >
> > > > I've attached a portion of the kafka-request.log set to trace.
> > > >
> > > > The correlation ids are:
> > > > 1389604489 - first request at offset 0
> > > > 1389604511  - timeout at offset 240
> > > > 1389604563  - got data request at offset 240
> > > > 1389604573  - got duplicates request at offset 242
> > > >
> > > > Regards,
> > > >  Gerrit
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao  wrote:
> > > >
> > > >> What's the offset used in the fetch request in steps g and i that
> both
> > > >> returned offsets 10 and 11?
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
> > > >> gerrit...@gmail.com> wrote:
> > > >>
> > > >> > Hi,
> > > >> >
> > > >> >
> > > >> > No the offsets are not the same. I've printed out the values to
> see
> > > >> this,
> > > >> > and its not the case.
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao 
> wrote:
> > > >> >
> > > >> > > Are the offset used in the 2 fetch requests the same? If so, you
> > > will
> > > >> get
> > > >> > > the same messages twice. You consumer is responsible for
> advancing
> > > the
> > > >> > > offsets after consumption.
> > > >> > >
> > >

Re: custom kafka consumer - strangeness

2014-01-13 Thread Gerrit Jansen van Vuuren
I'm using netty and async write, read.
For read I used a timeout such that if I do not see anything on the read
channel, my read function times out and returns null.
I do not see any error on the socket, and the same socket is used
throughout all of the fetches.

I'm using the console producer and messages are "11", "22", "abc", ""
etc.

I can reliably reproduce it every time.

Its weird yes, no compression is used, the timeout happens for the same
scenario every time.



On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao  wrote:

> I can't seen to find the log trace for the timed out fetch request (every
> fetch request seems to have a corresponding completed entry). For the timed
> out fetch request, is it that the broker never completed the request or is
> it that it just took longer than the socket timeout to finish processing
> the request? Do you use large messages in your test?
>
> If you haven't enabled compression, it's weird that you will re-get 240 and
> 241 with an offset of 242 in the fetch request. Is that easily
> reproducible?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > the offset in g is 240, and in i 242, the last message read was at offset
> > 239.
> >
> > After reading from 0 - 239, I make another request for 240, this request
> > timesout and never returns.
> > I then manually add 2 entries via the console producer, all the time
> while
> > making a request for 240 every 10 seconds, all subsequent requests for
> > offset 240 returns empty messages, till the responses are written. Then I
> > get the 2 messages at offsets 240,241 and an end of response. Then I
> make a
> > request for offset 242, and get the messages at offsets 240,241 again.
> >
> > I've attached a portion of the kafka-request.log set to trace.
> >
> > The correlation ids are:
> > 1389604489 - first request at offset 0
> > 1389604511  - timeout at offset 240
> > 1389604563  - got data request at offset 240
> > 1389604573  - got duplicates request at offset 242
> >
> > Regards,
> >  Gerrit
> >
> >
> >
> >
> > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao  wrote:
> >
> >> What's the offset used in the fetch request in steps g and i that both
> >> returned offsets 10 and 11?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren <
> >> gerrit...@gmail.com> wrote:
> >>
> >> > Hi,
> >> >
> >> >
> >> > No the offsets are not the same. I've printed out the values to see
> >> this,
> >> > and its not the case.
> >> >
> >> >
> >> >
> >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao  wrote:
> >> >
> >> > > Are the offset used in the 2 fetch requests the same? If so, you
> will
> >> get
> >> > > the same messages twice. You consumer is responsible for advancing
> the
> >> > > offsets after consumption.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> >> > > gerrit...@gmail.com> wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I'm writing a custom consumer for kafka 0.8.
> >> > > > Everything works except for the following:
> >> > > >
> >> > > > a. connect, send fetch, read all results
> >> > > > b. send fetch
> >> > > > c. send fetch
> >> > > > d. send fetch
> >> > > > e. via the console publisher, publish 2 messages
> >> > > > f. send fetch :corr-id 1
> >> > > > g. read 2 messages published :offsets [10 11] :corr-id 1
> >> > > > h. send fetch :corr-id 2
> >> > > > i. read 2 messages published :offsets [10 11] :corr-id 2
> >> > > > j.  send fetch ...
> >> > > >
> >> > > > The problem is I get the messages sent twice as a response to two
> >> > > separate
> >> > > > fetch requests. The correlation id is distinct so it cannot be
> that
> >> I
> >> > > read
> >> > > > the response twice. The offsets of the 2 messages are are the same
> >> so
> >> > > they
> >> > > > are duplicates, and its not the producer sending the messages
> twice.
> >> > > >
> >> > > > Note: the same connection is kept open the whole time, and I send
> >> > > > block,receive then send again, after the first 2 messages are
> read,
> >> the
> >> > > > offsets are incremented and the next fetch will ask kafka to give
> it
> >> > > > messages from the new offsets.
> >> > > >
> >> > > > any ideas of why kafka would be sending the messages again on the
> >> > second
> >> > > > fetch request?
> >> > > >
> >> > > > Regards,
> >> > > >  Gerrit
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>


Re: custom kafka consumer - strangeness

2014-01-11 Thread Gerrit Jansen van Vuuren
I'm also seeing the following.

I consume the data in the queue.
Then after 10 seconds send another fetch request (with the incremented
offset), and never receives a response from the broker, my code eventually
times out (after 30seconds).

The broker writes Expiring fetch request Name: FetchRequest; Version: 0;
CorrelationId: 1389443537; ClientId: 1; ReplicaId: -1; MaxWait: 1000 ms;
MinBytes: 1 bytes; RequestInfo: [ping,0] ->
PartitionFetchInfo(187,1048576).

This corresponds with the timed out fetch request.






On Sat, Jan 11, 2014 at 12:19 PM, Gerrit Jansen van Vuuren <
gerrit...@gmail.com> wrote:

> Hi,
>
>
> No the offsets are not the same. I've printed out the values to see this,
> and its not the case.
>
>
>
> On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao  wrote:
>
>> Are the offset used in the 2 fetch requests the same? If so, you will get
>> the same messages twice. You consumer is responsible for advancing the
>> offsets after consumption.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
>> gerrit...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > I'm writing a custom consumer for kafka 0.8.
>> > Everything works except for the following:
>> >
>> > a. connect, send fetch, read all results
>> > b. send fetch
>> > c. send fetch
>> > d. send fetch
>> > e. via the console publisher, publish 2 messages
>> > f. send fetch :corr-id 1
>> > g. read 2 messages published :offsets [10 11] :corr-id 1
>> > h. send fetch :corr-id 2
>> > i. read 2 messages published :offsets [10 11] :corr-id 2
>> > j.  send fetch ...
>> >
>> > The problem is I get the messages sent twice as a response to two
>> separate
>> > fetch requests. The correlation id is distinct so it cannot be that I
>> read
>> > the response twice. The offsets of the 2 messages are are the same so
>> they
>> > are duplicates, and its not the producer sending the messages twice.
>> >
>> > Note: the same connection is kept open the whole time, and I send
>> > block,receive then send again, after the first 2 messages are read, the
>> > offsets are incremented and the next fetch will ask kafka to give it
>> > messages from the new offsets.
>> >
>> > any ideas of why kafka would be sending the messages again on the second
>> > fetch request?
>> >
>> > Regards,
>> >  Gerrit
>> >
>>
>
>


Re: custom kafka consumer - strangeness

2014-01-11 Thread Gerrit Jansen van Vuuren
Hi,


No the offsets are not the same. I've printed out the values to see this,
and its not the case.



On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao  wrote:

> Are the offset used in the 2 fetch requests the same? If so, you will get
> the same messages twice. You consumer is responsible for advancing the
> offsets after consumption.
>
> Thanks,
>
> Jun
>
>
> On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm writing a custom consumer for kafka 0.8.
> > Everything works except for the following:
> >
> > a. connect, send fetch, read all results
> > b. send fetch
> > c. send fetch
> > d. send fetch
> > e. via the console publisher, publish 2 messages
> > f. send fetch :corr-id 1
> > g. read 2 messages published :offsets [10 11] :corr-id 1
> > h. send fetch :corr-id 2
> > i. read 2 messages published :offsets [10 11] :corr-id 2
> > j.  send fetch ...
> >
> > The problem is I get the messages sent twice as a response to two
> separate
> > fetch requests. The correlation id is distinct so it cannot be that I
> read
> > the response twice. The offsets of the 2 messages are are the same so
> they
> > are duplicates, and its not the producer sending the messages twice.
> >
> > Note: the same connection is kept open the whole time, and I send
> > block,receive then send again, after the first 2 messages are read, the
> > offsets are incremented and the next fetch will ask kafka to give it
> > messages from the new offsets.
> >
> > any ideas of why kafka would be sending the messages again on the second
> > fetch request?
> >
> > Regards,
> >  Gerrit
> >
>


Re: Velocity on local machine

2014-01-10 Thread Gerrit Jansen van Vuuren
Have you tried using more producers.
The kafka broker is performant, but the client producer's performance is
not what it should be.

You can also have a look at tuning the number of kafka broker's network and
io threads.


Regards,
 Gerrit


On Fri, Jan 10, 2014 at 1:06 PM, Klaus Schaefers <
klaus.schaef...@ligatus.com> wrote:

> Hi,
>
> I have close to 2k messages per second. My machine is just a (BG 4-core i5
> but I would expect more messages. I ran Kafka in the default settings.
>
>
> On Fri, Jan 10, 2014 at 12:31 PM, Magnus Edenhill  >wrote:
>
> > What performance numbers did you see?
> >
> > For reference you can check the following tests that were also run on the
> > same machine as the broker:
> >
> >
> https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#performance-numbers
> >
> > Do they correspond to your numbers?
> >
> > Consumer thruput is not included in that document but typically peaks at
> > around 3 million msgs/s when run on the same host and with hot disk
> caches.
> >
> > Regards,
> > Magnus
> >
> >
> > 2014/1/10 Klaus Schaefers 
> >
> > > Hi,
> > >
> > > I am currently benchmarking Kafka against ActiveMQ and I got some
> results
> > > the surprised my quite a lot. ActiveMQ managed to deliver 4x more
> > messages
> > > when running locally. But from all what I was reading I am a little bit
> > > surprised. Honestly I expected Kafka to outperform ActiveMQ. Some I am
> > left
> > > with the feeling that I configured some wrong or did any kind of other
> > > mistake.
> > >
> > > My setup looks like this:
> > >
> > > - One local broker
> > > - 10 Topic / Queues
> > > - 1 producer that dispatches messages randomly to the topics
> > > - 1 consumer per topic
> > >
> > > I basically used the example on the Kafka web page.
> > >
> > > Also I encountered some issues when increasing the number of topics,
> lets
> > > say 100. In this case the consumer cannot connect the Zookeeper...
> > >
> > > Does anybody has an idea how to improve the performance?
> > >
> > >
> > > Thx,
> > >
> > > Klaus
> > >
> > >
> > > --
> > >
> > > --
> > >
> > > Klaus Schaefers
> > > Senior Optimization Manager
> > >
> > > Ligatus GmbH
> > > Hohenstaufenring 30-32
> > > D-50674 Köln
> > >
> > > Tel.:  +49 (0) 221 / 56939 -784
> > > Fax:  +49 (0) 221 / 56 939 - 599
> > > E-Mail: klaus.schaef...@ligatus.com
> > > Web: www.ligatus.de
> > >
> > > HRB Köln 56003
> > > Geschäftsführung:
> > > Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> > > Dipl.-Wirtschaftsingenieur Arne Wolter
> > >
> >
>
>
>
> --
>
> --
>
> Klaus Schaefers
> Senior Optimization Manager
>
> Ligatus GmbH
> Hohenstaufenring 30-32
> D-50674 Köln
>
> Tel.:  +49 (0) 221 / 56939 -784
> Fax:  +49 (0) 221 / 56 939 - 599
> E-Mail: klaus.schaef...@ligatus.com
> Web: www.ligatus.de
>
> HRB Köln 56003
> Geschäftsführung:
> Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
> Dipl.-Wirtschaftsingenieur Arne Wolter
>


Re: custom kafka consumer - strangeness

2014-01-09 Thread Gerrit Jansen van Vuuren
thanks, I will definitely put this in,
does the console-producer send compressed messages by default? I haven't
specified compression for it, so assumed that it will send plain text.




On Thu, Jan 9, 2014 at 10:14 PM, Chris Curtin wrote:

> If you look at the example simple consumer:
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> You'll see:
>
>   if (currentOffset < readOffset) {
> System.out.println("Found an old offset: " + currentOffset + "
> Expecting: " + readOffset);
> continue;
> }
>
> and a comment in the 'Reading the Data' part:
>
> Also note that we are explicitly checking that the offset being read is not
> less than the offset that we requested. This is needed since if Kafka is
> compressing the messages, the fetch request will return an entire
> compressed block even if the requested offset isn't the beginning of the
> compressed block. Thus a message we saw previously may be returned again.
>
> This is probably what is happening to you
>
> Chris
>
>
> On Thu, Jan 9, 2014 at 4:00 PM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm writing a custom consumer for kafka 0.8.
> > Everything works except for the following:
> >
> > a. connect, send fetch, read all results
> > b. send fetch
> > c. send fetch
> > d. send fetch
> > e. via the console publisher, publish 2 messages
> > f. send fetch :corr-id 1
> > g. read 2 messages published :offsets [10 11] :corr-id 1
> > h. send fetch :corr-id 2
> > i. read 2 messages published :offsets [10 11] :corr-id 2
> > j.  send fetch ...
> >
> > The problem is I get the messages sent twice as a response to two
> separate
> > fetch requests. The correlation id is distinct so it cannot be that I
> read
> > the response twice. The offsets of the 2 messages are are the same so
> they
> > are duplicates, and its not the producer sending the messages twice.
> >
> > Note: the same connection is kept open the whole time, and I send
> > block,receive then send again, after the first 2 messages are read, the
> > offsets are incremented and the next fetch will ask kafka to give it
> > messages from the new offsets.
> >
> > any ideas of why kafka would be sending the messages again on the second
> > fetch request?
> >
> > Regards,
> >  Gerrit
> >
>


custom kafka consumer - strangeness

2014-01-09 Thread Gerrit Jansen van Vuuren
Hi,

I'm writing a custom consumer for kafka 0.8.
Everything works except for the following:

a. connect, send fetch, read all results
b. send fetch
c. send fetch
d. send fetch
e. via the console publisher, publish 2 messages
f. send fetch :corr-id 1
g. read 2 messages published :offsets [10 11] :corr-id 1
h. send fetch :corr-id 2
i. read 2 messages published :offsets [10 11] :corr-id 2
j.  send fetch ...

The problem is I get the messages sent twice as a response to two separate
fetch requests. The correlation id is distinct so it cannot be that I read
the response twice. The offsets of the 2 messages are are the same so they
are duplicates, and its not the producer sending the messages twice.

Note: the same connection is kept open the whole time, and I send
block,receive then send again, after the first 2 messages are read, the
offsets are incremented and the next fetch will ask kafka to give it
messages from the new offsets.

any ideas of why kafka would be sending the messages again on the second
fetch request?

Regards,
 Gerrit


Re: java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-01-02 Thread Gerrit Jansen van Vuuren
no I can't :(,  I upped it because some of the messages can be big. The
question still remains that 600mb is far from the 2gig int limit, is there
any reason why 600mb max size would cause the fecth buffer to overflow?



On Thu, Jan 2, 2014 at 5:19 PM, Jun Rao  wrote:

> Could you reduce the max message size? Do you really expect to have a
> single message of 600MB? After that, you can reduce the fetch size.
>
> Thanks,
>
> Jun
>
>
> On Thu, Jan 2, 2014 at 8:06 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > There is a particular topic that has allot of data in each message, there
> > is nothing I can do about it.
> > Because I have so much data I try to split the data over 8-12 partitions,
> > if I reduce the partitions I won't have enough consumers to consume the
> > data in time.
> >
> >
> > On Thu, Jan 2, 2014 at 4:50 PM, Jun Rao  wrote:
> >
> > > 600mb for fetch size is considerably larger than the default size. Is
> > there
> > > a particular reason for this? Also, how many partitions do you have?
> You
> > > may have to reduce the fetch size further if there are multiple
> > partitions.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Jan 2, 2014 at 2:42 AM, Gerrit Jansen van Vuuren <
> > > gerrit...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > I just double checked my configuration and the broker has
> > > message.max.bytes
> > > > set to 1 gig, the consumers have the same setting for max fetch size.
> > > I've
> > > > lowered this to 600 mb and still see the same error :(,
> > > >
> > > > at the moment kafka is un-usable for me, the the only other
> alternative
> > > is
> > > > writing my own client (as i'm doing with the producer), what a pain!
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Jan 1, 2014 at 6:45 PM, Jun Rao  wrote:
> > > >
> > > > > In our wire protocol, we expect the first 4 bytes for a response to
> > be
> > > > its
> > > > > size. If the actual size is larger than 2GB, what's stored in the
> > > those 4
> > > > > bytes is the overflowed value. This could cause some of the buffer
> > size
> > > > to
> > > > > be smaller than it should be later on. If #partitions * fetch_size
> >  is
> > > > > larger than 2GB in a single fetch request, you could hit this
> > problem.
> > > > You
> > > > > can try reducing the fetch size. Ideally, the sender should catch
> > this
> > > > and
> > > > > throw an exception, which we don't do currently.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Wed, Jan 1, 2014 at 9:27 AM, Gerrit Jansen van Vuuren <
> > > > > gerrit...@gmail.com> wrote:
> > > > >
> > > > > > Mm... Could be Im not sure if in a single request though. I am
> > moving
> > > > > allot
> > > > > > of data. Any pointer at were in the code the overflow might
> start?
> > > > > > On 1 Jan 2014 18:13, "Jun Rao"  wrote:
> > > > > >
> > > > > > > Are you fetching more than 2GB of data in a single fetch
> response
> > > > > (across
> > > > > > > all partitions)? Currently, we don't handle integer overflow
> > > > properly.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jan 1, 2014 at 4:24 AM, Gerrit Jansen van Vuuren <
> > > > > > > gerrit...@gmail.com> wrote:
> > > > > > >
> > > > > > > > While consuming from the topics I get an
> > IlegalArgumentException
> > > > and
> > > > > > all
> > > > > > > > consumption stops, the error keeps on throwing.
> > > > > > > >
> > > > > > > > I've tracked it down to FectchResponse.scala line 33
> > > > > > > >
> > > > > > > > The error happens when the FetchResponsePartitionData
> object's
> > > > > readFrom
> > > > > > > > method calls:
> > > > > > > > messageSetBuffer.limit(messageSetSize)
> > > > > > > >
> > > > > > > > I put in some debug code the the messageSetSize is 671758648,
> > > while
> > > > > the
> > > > > > > > buffer.capacity() gives 155733313, for some reason the buffer
> > is
> > > > > > smaller
> > > > > > > > than the required message size.
> > > > > > > >
> > > > > > > > I don't know the consumer code enough to debug this. It
> doesn't
> > > > > matter
> > > > > > if
> > > > > > > > compression is used or not.
> > > > > > > >
> > > > > > > > I've created a jira ticket for this:
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1196
> > > > > > > >
> > > > > > > > this is a real pain for me because I'm unable to consume from
> > > kafka
> > > > > at
> > > > > > > all
> > > > > > > > :(
> > > > > > > >
> > > > > > > >
> > > > > > > > Any ideas on possible config? or code changes I could try to
> > fix?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > >  Gerrit
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-01-02 Thread Gerrit Jansen van Vuuren
There is a particular topic that has allot of data in each message, there
is nothing I can do about it.
Because I have so much data I try to split the data over 8-12 partitions,
if I reduce the partitions I won't have enough consumers to consume the
data in time.


On Thu, Jan 2, 2014 at 4:50 PM, Jun Rao  wrote:

> 600mb for fetch size is considerably larger than the default size. Is there
> a particular reason for this? Also, how many partitions do you have? You
> may have to reduce the fetch size further if there are multiple partitions.
>
> Thanks,
>
> Jun
>
>
> On Thu, Jan 2, 2014 at 2:42 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Hi,
> >
> > I just double checked my configuration and the broker has
> message.max.bytes
> > set to 1 gig, the consumers have the same setting for max fetch size.
> I've
> > lowered this to 600 mb and still see the same error :(,
> >
> > at the moment kafka is un-usable for me, the the only other alternative
> is
> > writing my own client (as i'm doing with the producer), what a pain!
> >
> >
> >
> >
> >
> > On Wed, Jan 1, 2014 at 6:45 PM, Jun Rao  wrote:
> >
> > > In our wire protocol, we expect the first 4 bytes for a response to be
> > its
> > > size. If the actual size is larger than 2GB, what's stored in the
> those 4
> > > bytes is the overflowed value. This could cause some of the buffer size
> > to
> > > be smaller than it should be later on. If #partitions * fetch_size  is
> > > larger than 2GB in a single fetch request, you could hit this problem.
> > You
> > > can try reducing the fetch size. Ideally, the sender should catch this
> > and
> > > throw an exception, which we don't do currently.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Jan 1, 2014 at 9:27 AM, Gerrit Jansen van Vuuren <
> > > gerrit...@gmail.com> wrote:
> > >
> > > > Mm... Could be Im not sure if in a single request though. I am moving
> > > allot
> > > > of data. Any pointer at were in the code the overflow might start?
> > > > On 1 Jan 2014 18:13, "Jun Rao"  wrote:
> > > >
> > > > > Are you fetching more than 2GB of data in a single fetch response
> > > (across
> > > > > all partitions)? Currently, we don't handle integer overflow
> > properly.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Wed, Jan 1, 2014 at 4:24 AM, Gerrit Jansen van Vuuren <
> > > > > gerrit...@gmail.com> wrote:
> > > > >
> > > > > > While consuming from the topics I get an IlegalArgumentException
> > and
> > > > all
> > > > > > consumption stops, the error keeps on throwing.
> > > > > >
> > > > > > I've tracked it down to FectchResponse.scala line 33
> > > > > >
> > > > > > The error happens when the FetchResponsePartitionData object's
> > > readFrom
> > > > > > method calls:
> > > > > > messageSetBuffer.limit(messageSetSize)
> > > > > >
> > > > > > I put in some debug code the the messageSetSize is 671758648,
> while
> > > the
> > > > > > buffer.capacity() gives 155733313, for some reason the buffer is
> > > > smaller
> > > > > > than the required message size.
> > > > > >
> > > > > > I don't know the consumer code enough to debug this. It doesn't
> > > matter
> > > > if
> > > > > > compression is used or not.
> > > > > >
> > > > > > I've created a jira ticket for this:
> > > > > > https://issues.apache.org/jira/browse/KAFKA-1196
> > > > > >
> > > > > > this is a real pain for me because I'm unable to consume from
> kafka
> > > at
> > > > > all
> > > > > > :(
> > > > > >
> > > > > >
> > > > > > Any ideas on possible config? or code changes I could try to fix?
> > > > > >
> > > > > > Regards,
> > > > > >  Gerrit
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-01-02 Thread Gerrit Jansen van Vuuren
Hi,

I just double checked my configuration and the broker has message.max.bytes
set to 1 gig, the consumers have the same setting for max fetch size. I've
lowered this to 600 mb and still see the same error :(,

at the moment kafka is un-usable for me, the the only other alternative is
writing my own client (as i'm doing with the producer), what a pain!





On Wed, Jan 1, 2014 at 6:45 PM, Jun Rao  wrote:

> In our wire protocol, we expect the first 4 bytes for a response to be its
> size. If the actual size is larger than 2GB, what's stored in the those 4
> bytes is the overflowed value. This could cause some of the buffer size to
> be smaller than it should be later on. If #partitions * fetch_size  is
> larger than 2GB in a single fetch request, you could hit this problem. You
> can try reducing the fetch size. Ideally, the sender should catch this and
> throw an exception, which we don't do currently.
>
> Thanks,
>
> Jun
>
>
> On Wed, Jan 1, 2014 at 9:27 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > Mm... Could be Im not sure if in a single request though. I am moving
> allot
> > of data. Any pointer at were in the code the overflow might start?
> > On 1 Jan 2014 18:13, "Jun Rao"  wrote:
> >
> > > Are you fetching more than 2GB of data in a single fetch response
> (across
> > > all partitions)? Currently, we don't handle integer overflow properly.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Jan 1, 2014 at 4:24 AM, Gerrit Jansen van Vuuren <
> > > gerrit...@gmail.com> wrote:
> > >
> > > > While consuming from the topics I get an IlegalArgumentException and
> > all
> > > > consumption stops, the error keeps on throwing.
> > > >
> > > > I've tracked it down to FectchResponse.scala line 33
> > > >
> > > > The error happens when the FetchResponsePartitionData object's
> readFrom
> > > > method calls:
> > > > messageSetBuffer.limit(messageSetSize)
> > > >
> > > > I put in some debug code the the messageSetSize is 671758648, while
> the
> > > > buffer.capacity() gives 155733313, for some reason the buffer is
> > smaller
> > > > than the required message size.
> > > >
> > > > I don't know the consumer code enough to debug this. It doesn't
> matter
> > if
> > > > compression is used or not.
> > > >
> > > > I've created a jira ticket for this:
> > > > https://issues.apache.org/jira/browse/KAFKA-1196
> > > >
> > > > this is a real pain for me because I'm unable to consume from kafka
> at
> > > all
> > > > :(
> > > >
> > > >
> > > > Any ideas on possible config? or code changes I could try to fix?
> > > >
> > > > Regards,
> > > >  Gerrit
> > > >
> > >
> >
>


Re: java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-01-01 Thread Gerrit Jansen van Vuuren
Mm... Could be Im not sure if in a single request though. I am moving allot
of data. Any pointer at were in the code the overflow might start?
On 1 Jan 2014 18:13, "Jun Rao"  wrote:

> Are you fetching more than 2GB of data in a single fetch response (across
> all partitions)? Currently, we don't handle integer overflow properly.
>
> Thanks,
>
> Jun
>
>
> On Wed, Jan 1, 2014 at 4:24 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > While consuming from the topics I get an IlegalArgumentException and all
> > consumption stops, the error keeps on throwing.
> >
> > I've tracked it down to FectchResponse.scala line 33
> >
> > The error happens when the FetchResponsePartitionData object's readFrom
> > method calls:
> > messageSetBuffer.limit(messageSetSize)
> >
> > I put in some debug code the the messageSetSize is 671758648, while the
> > buffer.capacity() gives 155733313, for some reason the buffer is smaller
> > than the required message size.
> >
> > I don't know the consumer code enough to debug this. It doesn't matter if
> > compression is used or not.
> >
> > I've created a jira ticket for this:
> > https://issues.apache.org/jira/browse/KAFKA-1196
> >
> > this is a real pain for me because I'm unable to consume from kafka at
> all
> > :(
> >
> >
> > Any ideas on possible config? or code changes I could try to fix?
> >
> > Regards,
> >  Gerrit
> >
>


Re: only one ProducerSendThread thread when running with multiple brokers (kafka 0.8)

2014-01-01 Thread Gerrit Jansen van Vuuren
The network is 10gbit so it seems unlikely. The 5 brokers were running
without much load or probs. The bottle neck is that no matter how many
threads I use for sending, the sync block in the send method will never go
faster and its always limited to a single thread.

I also use snappy outside and no compression in the producer. A single
producer gives me max 6-10k tps, with 10 producers I can get max 60k tps.
This is on my servers and with my payload.

My end conclusion was that its impossible to scale a single producer
instance, and more threads make no difference on the sending side.
On 1 Jan 2014 17:31, "Chris Hogue"  wrote:

> Have you found what the actual bottleneck is? Is it the network send? Of
> course this would be highly influenced by the brokers' performance. After
> removing all compression work from the brokers we were able to get enough
> throughput from them that it's not really a concern.
>
> Another rough side-effect of the single synchronous send thread is that a
> single degrading or otherwise slow broker can back up the producing for the
> whole app. I haven't heard a great solution to this but would love to if
> someone's come up with it.
>
> -Chris
>
>
>
> On Wed, Jan 1, 2014 at 9:10 AM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > I've seen this bottle neck regardless of using compression or not, bpth
> > situations give me poor performance on sending to kafka via the scala
> > producer api.
> > On 1 Jan 2014 16:42, "Chris Hogue"  wrote:
> >
> > > Hi.
> > >
> > > When writing that blog we were using Kafka 0.7 as well. Understanding
> > that
> > > it probably wasn't the primary design goal, the separate send threads
> per
> > > broker that offered a separation of compression were a convenient
> > > side-effect of that design.
> > >
> > > We've since built new systems on 0.8 that have concentrated high
> > throughput
> > > on a small number of producers and had this discovery early on as well.
> > >
> > > Instead we've taken responsibility for the compression before the
> > producer
> > > and done that on separate threads as appropriate. While helpful for
> > > compression on the producer application the main reason for this is to
> > > prevent the broker from uncompressing and re-compressing each message
> as
> > it
> > > assigns offsets. There's a significant throughput advantage in doing
> > this.
> > >
> > > Truthfully since switching to snappy the compression throughput on the
> > > producer is much less of a concern in the overall context of the
> > > application.
> > >
> > > There was some discussion of these issues in the 'Client Improvement
> > > Discussion' thread a while ago where Jay provided some insight and
> > > discussion on future directions.
> > >
> > > -Chris
> > >
> > >
> > >
> > >
> > > On Wed, Jan 1, 2014 at 5:42 AM, yosi botzer 
> > wrote:
> > >
> > > > This is very interesting, this is what I see as well. I wish someone
> > > could
> > > > explain why it is not as explained here:
> > > > http://engineering.gnip.com/kafka-async-producer/
> > > >
> > > >
> > > > On Wed, Jan 1, 2014 at 2:39 PM, Gerrit Jansen van Vuuren <
> > > > gerrit...@gmail.com> wrote:
> > > >
> > > > > I don't know the code enough to comment on that (maybe someone else
> > on
> > > > the
> > > > > user list can do that), but from what I've seen doing some heavy
> > > > profiling
> > > > > I only see one thread per producer instance, it doesn't matter how
> > many
> > > > > brokers or topics you have the number of threads is always 1 per
> > > > producer.
> > > > > If you create 2 producers 2 threads and so on.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jan 1, 2014 at 1:27 PM, yosi botzer  >
> > > > wrote:
> > > > >
> > > > > > But shouldn't I see a separate thread per broker (I am using the
> > > async
> > > > > > mode)?  Why do I get a better performance sending a message that
> > has
> > > > > fewer
> > > > > > partitions?
> > > > > >
> > > > > >
> > > &g

Re: only one ProducerSendThread thread when running with multiple brokers (kafka 0.8)

2014-01-01 Thread Gerrit Jansen van Vuuren
I've seen this bottle neck regardless of using compression or not, bpth
situations give me poor performance on sending to kafka via the scala
producer api.
On 1 Jan 2014 16:42, "Chris Hogue"  wrote:

> Hi.
>
> When writing that blog we were using Kafka 0.7 as well. Understanding that
> it probably wasn't the primary design goal, the separate send threads per
> broker that offered a separation of compression were a convenient
> side-effect of that design.
>
> We've since built new systems on 0.8 that have concentrated high throughput
> on a small number of producers and had this discovery early on as well.
>
> Instead we've taken responsibility for the compression before the producer
> and done that on separate threads as appropriate. While helpful for
> compression on the producer application the main reason for this is to
> prevent the broker from uncompressing and re-compressing each message as it
> assigns offsets. There's a significant throughput advantage in doing this.
>
> Truthfully since switching to snappy the compression throughput on the
> producer is much less of a concern in the overall context of the
> application.
>
> There was some discussion of these issues in the 'Client Improvement
> Discussion' thread a while ago where Jay provided some insight and
> discussion on future directions.
>
> -Chris
>
>
>
>
> On Wed, Jan 1, 2014 at 5:42 AM, yosi botzer  wrote:
>
> > This is very interesting, this is what I see as well. I wish someone
> could
> > explain why it is not as explained here:
> > http://engineering.gnip.com/kafka-async-producer/
> >
> >
> > On Wed, Jan 1, 2014 at 2:39 PM, Gerrit Jansen van Vuuren <
> > gerrit...@gmail.com> wrote:
> >
> > > I don't know the code enough to comment on that (maybe someone else on
> > the
> > > user list can do that), but from what I've seen doing some heavy
> > profiling
> > > I only see one thread per producer instance, it doesn't matter how many
> > > brokers or topics you have the number of threads is always 1 per
> > producer.
> > > If you create 2 producers 2 threads and so on.
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Jan 1, 2014 at 1:27 PM, yosi botzer 
> > wrote:
> > >
> > > > But shouldn't I see a separate thread per broker (I am using the
> async
> > > > mode)?  Why do I get a better performance sending a message that has
> > > fewer
> > > > partitions?
> > > >
> > > >
> > > > On Wed, Jan 1, 2014 at 2:22 PM, Gerrit Jansen van Vuuren <
> > > > gerrit...@gmail.com> wrote:
> > > >
> > > > > The producer is heavily synchronized (i.e. all the code in the send
> > > > method
> > > > > is encapsulated in one huge synchronized block).
> > > > > Try creating multiple producers and round robin send over them.
> > > > >
> > > > > e.g.
> > > > >
> > > > > p = producers[ n++ % producers.length ]
> > > > >
> > > > > p.send msg
> > > > > This will give you one thread per producer instance.
> > > > >
> > > > > I'm working on an async multi threaded producer for kafka, but its
> > > > nothing
> > > > > near complete yet.
> > > > > https://github.com/gerritjvv/kafka-fast
> > > > >
> > > > >
> > > > > Regards,
> > > > >  Gerrit
> > > > >
> > > > >
> > > > > On Wed, Jan 1, 2014 at 1:17 PM, yosi botzer  >
> > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am using kafka 0.8. I have 3 machines each running kafka
> broker.
> > > > > >
> > > > > > I am using async mode of my Producer. I expected to see 3
> different
> > > > > threads
> > > > > > with names starting with ProducerSendThread- (according to this
> > > > article:
> > > > > > http://engineering.gnip.com/kafka-async-producer/)
> > > > > >
> > > > > > However I can see only one thread with the name
> > *ProducerSendThread-*
> > > > > >
> > > > > > This is my producer configuration:
> > > > > >
> > > > > > server=1
> > > > > > topic=dat7
> > > > > > metadata.broker.li

Re: only one ProducerSendThread thread when running with multiple brokers (kafka 0.8)

2014-01-01 Thread Gerrit Jansen van Vuuren
I don't know the code enough to comment on that (maybe someone else on the
user list can do that), but from what I've seen doing some heavy profiling
I only see one thread per producer instance, it doesn't matter how many
brokers or topics you have the number of threads is always 1 per producer.
If you create 2 producers 2 threads and so on.





On Wed, Jan 1, 2014 at 1:27 PM, yosi botzer  wrote:

> But shouldn't I see a separate thread per broker (I am using the async
> mode)?  Why do I get a better performance sending a message that has fewer
> partitions?
>
>
> On Wed, Jan 1, 2014 at 2:22 PM, Gerrit Jansen van Vuuren <
> gerrit...@gmail.com> wrote:
>
> > The producer is heavily synchronized (i.e. all the code in the send
> method
> > is encapsulated in one huge synchronized block).
> > Try creating multiple producers and round robin send over them.
> >
> > e.g.
> >
> > p = producers[ n++ % producers.length ]
> >
> > p.send msg
> > This will give you one thread per producer instance.
> >
> > I'm working on an async multi threaded producer for kafka, but its
> nothing
> > near complete yet.
> > https://github.com/gerritjvv/kafka-fast
> >
> >
> > Regards,
> >  Gerrit
> >
> >
> > On Wed, Jan 1, 2014 at 1:17 PM, yosi botzer 
> wrote:
> >
> > > Hi,
> > >
> > > I am using kafka 0.8. I have 3 machines each running kafka broker.
> > >
> > > I am using async mode of my Producer. I expected to see 3 different
> > threads
> > > with names starting with ProducerSendThread- (according to this
> article:
> > > http://engineering.gnip.com/kafka-async-producer/)
> > >
> > > However I can see only one thread with the name *ProducerSendThread-*
> > >
> > > This is my producer configuration:
> > >
> > > server=1
> > > topic=dat7
> > > metadata.broker.list=
> > > ec2-54-245-111-112.us-west-2.compute.amazonaws.com:9092
> > > ,ec2-54-245-111-69.us-west-2.compute.amazonaws.com:9092,
> > > ec2-54-218-183-14.us-west-2.compute.amazonaws.com:9092
> > > serializer.class=kafka.serializer.DefaultEncoder
> > > request.required.acks=1
> > > compression.codec=snappy
> > > producer.type=async
> > > queue.buffering.max.ms=2000
> > > queue.buffering.max.messages=1000
> > > batch.num.messages=500
> > >
> > >
> > > *What am I missing here?*
> > >
> > >
> > > BTW, I have also experienced very strange behavior regrading my
> producer
> > > performance (which may or may not be related to the issue above).
> > >
> > > When I have defined a topic with 1 partition I got much better
> throughput
> > > comparing to a topic with 3 partitions. A producer sending messages to
> a
> > > topic with 3 partitions had much better throughput comparing to a topic
> > > with 12 partitions.
> > >
> > > I would expect to have best performance for the topic with 12
> partitions
> > > since I have 3 machines running a broker each of with 4 disks (the
> broker
> > > is configured to use all 4 disks)
> > >
> > > *Is there any logical explanation for this behavior?*
> > >
> >
>


java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33

2014-01-01 Thread Gerrit Jansen van Vuuren
While consuming from the topics I get an IlegalArgumentException and all
consumption stops, the error keeps on throwing.

I've tracked it down to FectchResponse.scala line 33

The error happens when the FetchResponsePartitionData object's readFrom
method calls:
messageSetBuffer.limit(messageSetSize)

I put in some debug code the the messageSetSize is 671758648, while the
buffer.capacity() gives 155733313, for some reason the buffer is smaller
than the required message size.

I don't know the consumer code enough to debug this. It doesn't matter if
compression is used or not.

I've created a jira ticket for this:
https://issues.apache.org/jira/browse/KAFKA-1196

this is a real pain for me because I'm unable to consume from kafka at all
:(


Any ideas on possible config? or code changes I could try to fix?

Regards,
 Gerrit


Re: only one ProducerSendThread thread when running with multiple brokers (kafka 0.8)

2014-01-01 Thread Gerrit Jansen van Vuuren
The producer is heavily synchronized (i.e. all the code in the send method
is encapsulated in one huge synchronized block).
Try creating multiple producers and round robin send over them.

e.g.

p = producers[ n++ % producers.length ]

p.send msg
This will give you one thread per producer instance.

I'm working on an async multi threaded producer for kafka, but its nothing
near complete yet.
https://github.com/gerritjvv/kafka-fast


Regards,
 Gerrit


On Wed, Jan 1, 2014 at 1:17 PM, yosi botzer  wrote:

> Hi,
>
> I am using kafka 0.8. I have 3 machines each running kafka broker.
>
> I am using async mode of my Producer. I expected to see 3 different threads
> with names starting with ProducerSendThread- (according to this article:
> http://engineering.gnip.com/kafka-async-producer/)
>
> However I can see only one thread with the name *ProducerSendThread-*
>
> This is my producer configuration:
>
> server=1
> topic=dat7
> metadata.broker.list=
> ec2-54-245-111-112.us-west-2.compute.amazonaws.com:9092
> ,ec2-54-245-111-69.us-west-2.compute.amazonaws.com:9092,
> ec2-54-218-183-14.us-west-2.compute.amazonaws.com:9092
> serializer.class=kafka.serializer.DefaultEncoder
> request.required.acks=1
> compression.codec=snappy
> producer.type=async
> queue.buffering.max.ms=2000
> queue.buffering.max.messages=1000
> batch.num.messages=500
>
>
> *What am I missing here?*
>
>
> BTW, I have also experienced very strange behavior regrading my producer
> performance (which may or may not be related to the issue above).
>
> When I have defined a topic with 1 partition I got much better throughput
> comparing to a topic with 3 partitions. A producer sending messages to a
> topic with 3 partitions had much better throughput comparing to a topic
> with 12 partitions.
>
> I would expect to have best performance for the topic with 12 partitions
> since I have 3 machines running a broker each of with 4 disks (the broker
> is configured to use all 4 disks)
>
> *Is there any logical explanation for this behavior?*
>


Re: Kafka producer behavior

2013-12-18 Thread Gerrit Jansen van Vuuren
Hi,

this is a gotcha about kafka producer partitioning, you much send the
messages with a non null key.
If the key is null kafka will not call the partitioner.

Because with this partitioner the key does not matter you can pass in a
constant string like "1" etc.

Oh one more thing, on performance:

The produce's send method has a synchronized block on the producer
instance, which means performance goes down the drain.
I could only get (on a 12 core, 72 gig ram) machine 13K tps out of the
producer. A way to solve this is to instantiate an array/list of N
producers and then in your send code round robin over the producers.
I got to 80K tps (for my use case) using 6 producer instances from a single
box sending to 3 kafka servers.

e.g.


send ( msg ) {
  producers[ producer-index.getAndIncrement() % producer_count ].send(msg)
}

Regards,
 Gerrit


On Wed, Dec 18, 2013 at 11:24 AM, Hanish Bansal <
hanish.bansal.agar...@gmail.com> wrote:

> Thanks for response Gerrit and Guozhang !!
>
> Hi Gerrit,
>
> I am trying to use  same round robin partitioner shared by you but hard
> luck, still round robin partitioning not working.
>
> I have successfully registered RoundRobinPartitioner in kafka producer.
>
> Code of RoundRobinPartitioner class as:
>
> public RoundRobinPartitioner(VerifiableProperties props){
>  log.info("Using Round Robin Partitioner class...");
> }
>
> @Override
> public int partition(String key, int partitions) {
> log.info("Inside partition method");
> int i = counter.getAndIncrement();
> if(i == Integer.MAX_VALUE){
> counter.set(0);
>  return 0;
> }else
>  return i % partitions;
> }
>
> When i produce the data, first log message "Using Round Robin Partitioner
> class..." is printed and second message "Inside partition method" is not
> printed.
>
> From that we can ensure that RoundRobinPartitioner has been successfully
> registered but logic of round robin is not getting called.
>
> Any help to resolve what i am missing ?
>
> Thanks in advance !!
>
>
>
> On Tue, Dec 17, 2013 at 5:59 PM, Guozhang Wang  wrote:
>
> > Hello,
> >
> > This issue is known as in this JIRA:
> >
> > https://issues.apache.org/jira/browse/KAFKA-1067
> >
> > Guozhang
> >
> >
> > On Tue, Dec 17, 2013 at 8:48 AM, Gerrit Jansen van Vuuren <
> > gerrit...@gmail.com> wrote:
> >
> > > hi,
> > >
> > > I've had the same issue with the kafka producer.
> > >
> > > you need to use a different partitioner than the default one provided
> for
> > > kafka.
> > > I've created a round robin partitioner that works well for equally
> > > distributing data across partitions.
> > >
> > >
> > >
> >
> https://github.com/gerritjvv/pseidon/blob/master/pseidon-kafka/java/pseidon/kafka/util/RoundRobinPartitioner.java
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Dec 17, 2013 at 5:32 PM, Hanish Bansal <
> > > hanish.bansal.agar...@gmail.com> wrote:
> > >
> > > > Hi All,
> > > >
> > > > We are having kafka cluster of 2 nodes. (using 0.8.0 final release)
> > > > Replication Factor: 2
> > > > Number of partitions: 2
> > > >
> > > > I have created a topic "test-topic1" in kafka.
> > > >
> > > > When i am listing status of that topic using bin/kafka-list-topic.sh,
> > the
> > > > status is:
> > > >
> > > > topic: test-topic1partition: 0leader: 0   replicas: 0,1
> > > isr:
> > > > 0,1
> > > > topic: test-topic1partition: 1leader: 1   replicas: 1,0
> > > isr:
> > > > 1,0
> > > >
> > > > As both partition are on two separate nodes so when we produce the
> data
> > > it
> > > > should be go to both nodes.
> > > >
> > > > But when i insert the data, it is going to only one node.
> > > >
> > > > For example if i insert 1000 messages then all 1000 messages will go
> > > either
> > > > node1 or node2. Data is not evenly distributed on both nodes.
> > > >
> > > > Expected: 500 messages should go to node1 and 500 messages should go
> to
> > > > node2.
> > > >
> > > > Any suggestion why i am facing this behavior?
> > > >
> > > > --
> > > > *Thanks & Regards*
> > > > *Hanish Bansal*
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> *Thanks & Regards*
> *Hanish Bansal*
>


Re: Kafka producer behavior

2013-12-17 Thread Gerrit Jansen van Vuuren
hi,

I've had the same issue with the kafka producer.

you need to use a different partitioner than the default one provided for
kafka.
I've created a round robin partitioner that works well for equally
distributing data across partitions.

https://github.com/gerritjvv/pseidon/blob/master/pseidon-kafka/java/pseidon/kafka/util/RoundRobinPartitioner.java





On Tue, Dec 17, 2013 at 5:32 PM, Hanish Bansal <
hanish.bansal.agar...@gmail.com> wrote:

> Hi All,
>
> We are having kafka cluster of 2 nodes. (using 0.8.0 final release)
> Replication Factor: 2
> Number of partitions: 2
>
> I have created a topic "test-topic1" in kafka.
>
> When i am listing status of that topic using bin/kafka-list-topic.sh, the
> status is:
>
> topic: test-topic1partition: 0leader: 0   replicas: 0,1   isr:
> 0,1
> topic: test-topic1partition: 1leader: 1   replicas: 1,0   isr:
> 1,0
>
> As both partition are on two separate nodes so when we produce the data it
> should be go to both nodes.
>
> But when i insert the data, it is going to only one node.
>
> For example if i insert 1000 messages then all 1000 messages will go either
> node1 or node2. Data is not evenly distributed on both nodes.
>
> Expected: 500 messages should go to node1 and 500 messages should go to
> node2.
>
> Any suggestion why i am facing this behavior?
>
> --
> *Thanks & Regards*
> *Hanish Bansal*
>