consumer throttling based on rate quotas leads to client errors

2016-12-13 Thread Paul Mackles
Hi - We are using kafka_2.11-0.9.0.1. Using the kafka-configs.sh command, we 
set the consumer_byte_rate for a specific client.id that was misbehaving.


The new setting definitely should have led to some throttling.


What we found was that connections from that client.id started failing with the 
following error:


org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 554098, only 64 bytes available


Here is the stack trace:


org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'responses': Error reading array of size 554098, only 64 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)


Resetting the consumer_byte_rate to the default for that client.id immediately 
fixed the errors.


I couldn't find anything in Jira that looked similar to this. Has anyone seen 
anything like this before?


Thanks,

Paul



Re: consumer client pause/resume/rebalance

2016-11-08 Thread Paul Mackles
Hi Gwen - Makes sense. The way you explain it actually reminds me a little of 
the "worse is better" philosophy: https://www.jwz.org/doc/worse-is-better.html


Perhaps a mention in the javadoc for pause() and/or ConsumerRebalanceListener 
would be sufficient.


From: Gwen Shapira <g...@confluent.io>
Sent: Monday, November 07, 2016 3:34:39 PM
To: Users
Subject: Re: consumer client pause/resume/rebalance

I think the current behavior is fairly reasonable. Following a
rebalance the entire state of the consumer changes - you may get an
entirely new set of partitions. A common use-case for pause is to
allow a consumer to keep polling and avoid getting new events while it
is retrying to process existing events - well, following a rebalance,
it is possible that another consumer owns the partition, is already
re-processing these events and the entire state needs to be reset.

I usually recommend developers to treat rebalance as a restart (since
you are getting a whole new set of partitions) and just follow
whatever process you'd follow to set up after a restart. Since pauses
don't survive restarts, I wouldn't expect them to survive a rebalance
either.

I hope this helps explain the behavior?

On Mon, Nov 7, 2016 at 9:53 AM, Paul Mackles <pmack...@adobe.com> wrote:
> Using the  v0.9.0.1 consumer API, I recently learned that paused partitions 
> can unexpectedly become become unpaused during a rebalance. I also found an 
> old thread from the mailing list which corroborates this behavior:
>
>
> http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour
>
>
> <http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour>While
>  I can maintain the partition state myself, it seems like it would be a lot 
> easier if this were either handled internally by the consumer API (i.e. pause 
> the partitions that were previously paused before resuming) and/or make the 
> partition state available to the RebalanceListener.
>
>
> I did not find any existing tickets in JIRA related to this so I am wondering 
> if this is a valid bug/enhancement or if someone found a decent workaround. 
> All of the consumer API examples that I have found do not appear to handle 
> this scenario.
>
>
> Here is the code snippet from he client I have been working on:
>
>
> consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));
>
> while (!isWritable()) {
>   // WARNING: if there is a rebalance, this call may return some records!!!
>   consumer.poll(0);
>   Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
> }
>
> consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));
>
>
> Thanks,
>
> Paul
>
>
>



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


consumer client pause/resume/rebalance

2016-11-07 Thread Paul Mackles
Using the  v0.9.0.1 consumer API, I recently learned that paused partitions can 
unexpectedly become become unpaused during a rebalance. I also found an old 
thread from the mailing list which corroborates this behavior:


http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour


While
 I can maintain the partition state myself, it seems like it would be a lot 
easier if this were either handled internally by the consumer API (i.e. pause 
the partitions that were previously paused before resuming) and/or make the 
partition state available to the RebalanceListener.


I did not find any existing tickets in JIRA related to this so I am wondering 
if this is a valid bug/enhancement or if someone found a decent workaround. All 
of the consumer API examples that I have found do not appear to handle this 
scenario.


Here is the code snippet from he client I have been working on:


consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));

while (!isWritable()) {
  // WARNING: if there is a rebalance, this call may return some records!!!
  consumer.poll(0);
  Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
}

consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));


Thanks,

Paul





Re: GC is running forever

2015-10-15 Thread Paul Mackles
We had an issue that sounds somewhat similar. It impacted many long-running 
java apps but Kafka more than most. In our case, it actually turned out to be a 
kernel bug. Here is a reference with more details:

https://groups.google.com/forum/#!topic/mechanical-sympathy/QbmpZxp6C64

Thanks,
Paul

From: Yan Wang 
Sent: Thursday, October 15, 2015 4:28 PM
To: users@kafka.apache.org
Subject: GC is running forever

Hello Kafka Experts.

We have experienced a long running GC problem for a while. GC will
eventually eat up all the cpu cycles on the physical box. Have chatted with
Gwen regarding this issue during recent ended Hadoop conference at NYC, per
her recommendation we switched to G1, but we still see the same problem
happening. We have tried on two different version of jdk (java7u51 and
java7u71) and two different GC (CMS and G1).

The fun part for this problem is that once we run jstack to do thread dump
it will immediately terminate the long running GC and everything is back to
normal.


I am attaching my jvm parameters and gc logs as reference and hope some
experts can shed light on this problem

-Xmx4G -Xms4G -server
-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
-XX:+DisableExplicitGC
-Djava.awt.headless=true
-Xloggc:/logs/kaf/kafka-broker-solr-hbase/kafkaServer-gc.log -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps


GC log
2015-10-15T14:35:29.910-0500: 167428.343: [GC pause (young), 0.0087380
secs]
   [Parallel Time: 5.8 ms, GC Workers: 18]
  [GC Worker Start (ms): Min: 167428342.9, Avg: 167428343.1, Max:
167428343.3, Diff: 0.4]
  [Ext Root Scanning (ms): Min: 0.9, Avg: 1.3, Max: 1.7, Diff: 0.8,
Sum: 23.2]
  [Update RS (ms): Min: 2.3, Avg: 2.6, Max: 2.8, Diff: 0.5, Sum: 46.2]
 [Processed Buffers: Min: 4, Avg: 5.9, Max: 10, Diff: 6, Sum: 107]
  [Scan RS (ms): Min: 0.0, Avg: 0.1, Max: 0.2, Diff: 0.1, Sum: 1.9]
  [Object Copy (ms): Min: 1.4, Avg: 1.4, Max: 1.6, Diff: 0.2, Sum:
25.9]
  [Termination (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.1]
  [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.1, Diff: 0.1, Sum:
0.8]
  [GC Worker Total (ms): Min: 5.2, Avg: 5.5, Max: 5.7, Diff: 0.4, Sum:
98.2]
  [GC Worker End (ms): Min: 167428348.6, Avg: 167428348.6, Max:
167428348.7, Diff: 0.1]
   [Code Root Fixup: 0.0 ms]
   [Clear CT: 0.8 ms]
   [Other: 2.2 ms]
  [Choose CSet: 0.0 ms]
  [Ref Proc: 0.8 ms]
  [Ref Enq: 0.0 ms]
  [Free CSet: 1.3 ms]
   [Eden: 788.0M(788.0M)->0.0B(952.0M) Survivors: 14.0M->6144.0K Heap:
1199.6M(4096.0M)->403.9M(4096.0M)]
 [Times: user=0.11 sys=0.00, real=0.01 secs]
2015-10-15T14:35:36.522-0500: 167434.955: [GC pause (young), 1227.2679990
secs]
   [Parallel Time: 1227265.2 ms, GC Workers: 18]
  [GC Worker Start (ms): Min: 167434954.8, Avg: 167434955.0, Max:
167434955.2, Diff: 0.4]
  [Ext Root Scanning (ms): Min: 1.1, Avg: 68182.6, Max: 1227264.8,
Diff: 1227263.6, Sum: 1227287.6]
  [Update RS (ms): Min: 0.0, Avg: 1.9, Max: 2.2, Diff: 2.2, Sum: 34.1]
 [Processed Buffers: Min: 0, Avg: 4.8, Max: 11, Diff: 11, Sum: 87]
  [Scan RS (ms): Min: 0.1, Avg: 0.1, Max: 0.2, Diff: 0.1, Sum: 2.4]
  [Object Copy (ms): Min: 0.1, Avg: 1.4, Max: 1.6, Diff: 1.5, Sum:
25.2]
  [Termination (ms): Min: 0.0, Avg: 1159078.8, Max: 1227259.9, Diff:
1227259.9, Sum: 20863418.3]
  [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum:
0.3]
  [GC Worker Total (ms): Min: 1227264.6, Avg: 1227264.9, Max:
1227265.1, Diff: 0.4, Sum: 22090767.9]
  [GC Worker End (ms): Min: 168662219.8, Avg: 168662219.9, Max:
168662219.9, Diff: 0.1]
   [Code Root Fixup: 0.0 ms]
   [Clear CT: 0.6 ms]
   [Other: 2.2 ms]
  [Choose CSet: 0.0 ms]
  [Ref Proc: 0.6 ms]
  [Ref Enq: 0.0 ms]
  [Free CSet: 1.5 ms]
   [Eden: 952.0M(952.0M)->0.0B(198.0M) Survivors: 6144.0K->6144.0K Heap:
1366.0M(4096.0M)->414.0M(4096.0M)]
 [Times: user=20341.80 sys=0.00, real=1227.08 secs]
2015-10-15T14:56:03.852-0500: 168662.285: [GC pause (young), 0.0050930
secs]
   [Parallel Time: 3.3 ms, GC Workers: 18]
  [GC Worker Start (ms): Min: 168662284.6, Avg: 168662284.8, Max:
168662285.0, Diff: 0.4]
  [Ext Root Scanning (ms): Min: 1.0, Avg: 1.4, Max: 1.8, Diff: 0.8,
Sum: 24.3]
  [Update RS (ms): Min: 0.0, Avg: 0.1, Max: 0.4, Diff: 0.4, Sum: 2.4]
 [Processed Buffers: Min: 0, Avg: 2.4, Max: 8, Diff: 8, Sum: 44]
  [Scan RS (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.1]
  [Object Copy (ms): Min: 1.0, Avg: 1.2, Max: 1.3, Diff: 0.3, Sum:
22.5]
  [Termination (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.1]
  [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum:
0.3]
  [GC Worker Total (ms): Min: 2.6, Avg: 2.8, Max: 3.0, Diff: 0.4, Sum:
49.7]
  [GC Worker End (ms): Min: 168662287.6, Avg: 168662287.6, Max:
168662287.6, Diff: 0.0]
   [Code Root Fixup: 0.0 

Re: How does number of partitions affect sequential disk IO

2014-06-24 Thread Paul Mackles
You'll want to account for the number of disks per node. Normally,
partitions are spread across multiple disks. Even more important, the OS
file cache reduces the amount of seeking provided that you are reading
mostly sequentially and your consumers are keeping up.

On 6/24/14 3:58 AM, Daniel Compton d...@danielcompton.net wrote:

I¹ve been reading the Kafka docs and one thing that I¹m having trouble
understanding is how partitions affect sequential disk IO. One of the
reasons Kafka is so fast is that you can do lots of sequential IO with
read-ahead cache and all of that goodness. However, if your broker is
responsible for say 20 partitions, then won¹t the disk be seeking to 20
different spots for its writes and reads? I thought that maybe letting
the OS handle fsync would make this less of an issue but it still seems
like it could be a problem.

In our particular situation, we are going to have 6 brokers, 3 in each
DC, with mirror maker replication from the secondary DC to the primary
DC. We aren¹t likely to need to add more nodes for a while so would it be
faster to have 1 partition/node than say 3-4/node to minimise the seek
times on disk?

Are my assumptions correct or is this not an issue in practice? There are
some nice things about having more partitions like rebalancing more
evenly if we lose a broker but we don¹t want to make things significantly
slower to get this.

Thanks, Daniel.



Re: How does number of partitions affect sequential disk IO

2014-06-24 Thread Paul Mackles
Its probably best to run some tests that simulate your usage patterns. I
think a lot of it will be determined by how effectively you are able to
utilize the OS file cache in which case you could have many more
partitions. Its a delicate balance but you definitely want to err on the
side of having more partitions. Keep in mind that you are only able to
parallelize down to the partition level so if you have only have 2
partitions, you can only have 2 consumers. Depending on your volume, that
might not be enough.

On 6/24/14 6:44 AM, Daniel Compton d...@danielcompton.net wrote:

Good point. We've only got two disks per node and two topics so I was
planning to have one disk/partition.

Our workload is very write heavy so I'm mostly concerned about write
throughput. Will we get write speed improvements by sticking to 1
partition/disk or will the difference between 1 and 3 partitions/node be
negligible?

 On 24/06/2014, at 9:42 pm, Paul Mackles pmack...@adobe.com wrote:
 
 You'll want to account for the number of disks per node. Normally,
 partitions are spread across multiple disks. Even more important, the OS
 file cache reduces the amount of seeking provided that you are reading
 mostly sequentially and your consumers are keeping up.
 
 On 6/24/14 3:58 AM, Daniel Compton d...@danielcompton.net wrote:
 
 I¹ve been reading the Kafka docs and one thing that I¹m having trouble
 understanding is how partitions affect sequential disk IO. One of the
 reasons Kafka is so fast is that you can do lots of sequential IO with
 read-ahead cache and all of that goodness. However, if your broker is
 responsible for say 20 partitions, then won¹t the disk be seeking to 20
 different spots for its writes and reads? I thought that maybe letting
 the OS handle fsync would make this less of an issue but it still seems
 like it could be a problem.
 
 In our particular situation, we are going to have 6 brokers, 3 in each
 DC, with mirror maker replication from the secondary DC to the primary
 DC. We aren¹t likely to need to add more nodes for a while so would it
be
 faster to have 1 partition/node than say 3-4/node to minimise the seek
 times on disk?
 
 Are my assumptions correct or is this not an issue in practice? There
are
 some nice things about having more partitions like rebalancing more
 evenly if we lose a broker but we don¹t want to make things
significantly
 slower to get this.
 
 Thanks, Daniel.
 



Re: ISR not updating

2014-05-17 Thread Paul Mackles
Today we did a rolling restart of ZK. We also restarted the kafka
controller and ISRs are still not being updated in ZK. Again, the cluster
seems fine and the replicas in question do appear to be getting updated. I
am guessing there must be some bad state persisted in ZK.

On 5/17/14 7:50 PM, Shone Sadler shone.sad...@gmail.com wrote:

Hi Jun,

I work with Paul and am monitoring the cluster as well.   The status has
not changed.

When we execute kafka-list-topic we are seeing the following (showing one
of two partitions having the problem)

topic: t1 partition: 33 leader: 1 replicas: 1,2,3 isr: 1

when inspecting the logs of leader: I do see a spurt of ISR
shrinkage/expansion  around the time that the brokers were partitioned
from
ZK. But nothing past the last message Cached zkVersion [17] not equal to
that in zookeeper. from  yesterday.  There are not constant changes to
the
ISR list.

Is there a way to force the leader to update ZK with the latest ISR list?

Thanks,
Shone

Logs:

cat server.log | grep \[t1,33\]

[2014-04-18 10:16:32,814] INFO [ReplicaFetcherManager on broker 1]
Removing
fetcher for partition [t1,33] (kafka.server.ReplicaFetcherManager)
[2014-05-13 19:42:10,784] ERROR [KafkaApi-1] Error when processing fetch
request for partition [t1,33] offset 330118156 from consumer with
correlation id 0 (kafka.server.KafkaApis)
[2014-05-14 11:02:25,255] ERROR [KafkaApi-1] Error when processing fetch
request for partition [t1,33] offset 332896470 from consumer with
correlation id 0 (kafka.server.KafkaApis)
[2014-05-16 12:00:11,344] INFO Partition [t1,33] on broker 1: Shrinking
ISR
for partition [t1,33] from 3,1,2 to 1 (kafka.cluster.Partition)
[2014-05-16 12:00:18,009] INFO Partition [t1,33] on broker 1: Cached
zkVersion [17] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
[2014-05-16 13:33:11,344] INFO Partition [t1,33] on broker 1: Shrinking
ISR
for partition [t1,33] from 3,1,2 to 1 (kafka.cluster.Partition)
[2014-05-16 13:33:12,403] INFO Partition [t1,33] on broker 1: Cached
zkVersion [17] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)


On Sat, May 17, 2014 at 11:44 AM, Jun Rao jun...@gmail.com wrote:

 Do you see constant ISR shrinking/expansion of those two partitions in
the
 leader broker's log ?

 Thanks,

 Jun


 On Fri, May 16, 2014 at 4:25 PM, Paul Mackles pmack...@adobe.com
wrote:

  Hi - We are running kafka_2.8.0-0.8.0-beta1 (we are a little behind in
  upgrading).
 
  From what I can tell, connectivity to ZK was lost for a brief period.
The
  cluster seemed to recover OK except that we now have 2 (out of 125)
  partitions where the ISR appears to be out of date. In other words,
  kafka-list-topic is showing only one replica in the ISR for the 2
  partitions in question (there should be 3).
 
  What's odd is that in looking at the log segments for those
partitions on
  the file system, I can see that they are in fact getting updated and
by
 all
  measures look to be in sync. I can also see that the brokers where the
  out-of-sync replicas reside are doing fine and leading other
partitions
  like nothing ever happened. Based on that, it seems like the ISR in
ZK is
  just out-of-date due to a botched recovery from the brief ZK outage.
 
  Has anyone seen anything like this before? I saw this ticket which
 sounded
  similar:
 
  https://issues.apache.org/jira/browse/KAFKA-948
 
  Anyone have any suggestions for recovering from this state? I was
 thinking
  of running the preferred-replica-election tool next to see if that
gets
 the
  ISRs in ZK back in sync.
 
  After that, I guess the next step would be to bounce the kafka
servers in
  question.
 
  Thanks,
  Paul
 
 




ISR not updating

2014-05-16 Thread Paul Mackles
Hi - We are running kafka_2.8.0-0.8.0-beta1 (we are a little behind in 
upgrading).

From what I can tell, connectivity to ZK was lost for a brief period. The 
cluster seemed to recover OK except that we now have 2 (out of 125) partitions 
where the ISR appears to be out of date. In other words, kafka-list-topic is 
showing only one replica in the ISR for the 2 partitions in question (there 
should be 3).

What's odd is that in looking at the log segments for those partitions on the 
file system, I can see that they are in fact getting updated and by all 
measures look to be in sync. I can also see that the brokers where the 
out-of-sync replicas reside are doing fine and leading other partitions like 
nothing ever happened. Based on that, it seems like the ISR in ZK is just 
out-of-date due to a botched recovery from the brief ZK outage.

Has anyone seen anything like this before? I saw this ticket which sounded 
similar:

https://issues.apache.org/jira/browse/KAFKA-948

Anyone have any suggestions for recovering from this state? I was thinking of 
running the preferred-replica-election tool next to see if that gets the ISRs 
in ZK back in sync.

After that, I guess the next step would be to bounce the kafka servers in 
question.

Thanks,
Paul



Re: 0.8 high-level consumer error handling

2014-01-08 Thread Paul Mackles
Hi Joel - The kind of error I am thinking about is when there is a
networking issue where the consumer is completely cut-off from the
cluster. In that scenario, the consuming application has no way of knowing
whether there is an actual problem or there are just no messages to
consume. In the case of a networking issue, the application might want to
shutdown and/or send a notification upstream.

On 1/7/14 8:13 PM, Joel Koshy jjkosh...@gmail.com wrote:

Paul,

I don't think there is currently a way to detect this condition
apart from alerting off consumer metrics or logs.

However, I'm not sure it can be called a fatal condition in that
the brokers could re-register in zookeeper and consumption would then
resume; unless someone decides to move a Kafka cluster to some
other zookeeper namespace without telling anyone.

What would be a suitable action on the application-side if such a
condition were propagated back to the application as an exception?

Thanks,

Joel

On Tue, Jan 07, 2014 at 06:00:29PM +, Paul Mackles wrote:
 Hi - I noticed that if a kafka cluster goes away entirely, the
high-level consumer will endlessly try to fetch metadata until the
cluster comes back up, never bubbling the error condition up to the
application. While I see a setting to control the interval at which it
reconnects, I don't see anything to tell it when to just give up. I
think it would be useful if there were a way for the application to
detect this condition and possibly take some sort of action. Either a
max-retries setting and/or some sort of flag that can be tested after a
timeout. Is that capability already there? Is there a known workaround
for this?
 
 Thanks,
 Paul




0.8 high-level consumer error handling

2014-01-07 Thread Paul Mackles
Hi - I noticed that if a kafka cluster goes away entirely, the high-level 
consumer will endlessly try to fetch metadata until the cluster comes back up, 
never bubbling the error condition up to the application. While I see a setting 
to control the interval at which it reconnects, I don't see anything to tell it 
when to just give up. I think it would be useful if there were a way for the 
application to detect this condition and possibly take some sort of action. 
Either a max-retries setting and/or some sort of flag that can be tested after 
a timeout. Is that capability already there? Is there a known workaround for 
this?

Thanks,
Paul


Re: JMXTrans not sending kafka 0.8 metrics to Ganglia

2013-11-04 Thread Paul Mackles
It looks like you are missing quotes in the object name. Here is a snippet
from our jmxtrans configs:

resultAlias: ReplicaManager,
 obj: \kafka.server\:type=\ReplicaManager\,name=\*\,
 attr: [
  Count,
  OneMinuteRate,
  MeanRate,
  Value
 ]


Unless more recent versions of kafka get rid of the quotes (we are on an
older 0.8 version still).

Paul

On 11/4/13 8:02 PM, Priya Matpadi priya.matp...@ecofactor.com wrote:

I am trying to send kafka metrics for display to ganglia server using
latest download from https://github.com/adambarthelson/kafka-ganglia.

Here's my kafka_metrics.json
{
  servers : [ {
port : ,
host : ecokaf1,
queries : [ {
  outputWriters : [ {
@class :
com.googlecode.jmxtrans.model.output.GangliaWriter,
settings : {
  groupName : kafka stats 2,
  port : 8649,
  host : ecokaf1
}
  } ],
  obj : kafka.server:type=ReplicaManager,name=PartitionCount,
  resultAlias: Kafka.ReplicaManager,
  attr : [ Value ]
} ],
numQueryThreads : 2
  } ]
}

I start jmxtrans as follows:
sudo ./jmxtrans.sh start kafka_metrics.json


JMXTrans starts without any issues. Here are the logs:

[05 Nov 2013 00:49:48] [main] 0  INFO
(com.googlecode.jmxtrans.JmxTransformer:134) - Starting Jmxtrans on :
kafka_metrics.json
[05 Nov 2013 00:49:48] [main] 386DEBUG
(com.googlecode.jmxtrans.JmxTransformer:354) - Loaded file:
/usr/share/jmxtrans/kafka_metrics.json
[05 Nov 2013 00:49:48] [main] 392DEBUG
(com.googlecode.jmxtrans.model.output.GangliaWriter:119) - Validated
Ganglia metric [host: ecokaf1, port: 8649, addressingMode: UNICAST, ttl:
5,
v3.1: true, units: '', slope: BOTH, tmax: 60, dmax: 0, spoofedHostName:
192.168.3.1:ecokaf1, groupName: 'kafka stats 2']
[05 Nov 2013 00:49:48] [main] 406DEBUG
(com.googlecode.jmxtrans.JmxTransformer:429) - Scheduled job:
ecokaf1:-1383612588479-3580845919 for server: Server [host=ecokaf1,
port=, url=null, cronExpression=null, numQueryThreads=2]
[05 Nov 2013 00:49:48] [ServerScheduler_Worker-1] 414DEBUG
(com.googlecode.jmxtrans.jobs.ServerJob:31) - + Started server job:
Server [host=ecokaf1, port=, url=null, cronExpression=null,
numQueryThreads=2]
[05 Nov 2013 00:49:48] [ServerScheduler_Worker-1] 544DEBUG
(com.googlecode.jmxtrans.util.JmxUtils:102) - - Creating 1 query
threads
[05 Nov 2013 00:49:48] [ServerScheduler_Worker-1] 553DEBUG
(com.googlecode.jmxtrans.jobs.ServerJob:50) - + Finished server job:
Server [host=ecokaf1, port=,
url=service:jmx:rmi:///jndi/rmi://ecokaf1:/jmxrmi,
cronExpression=null,
numQueryThreads=2]
[05 Nov 2013 00:50:48] [ServerScheduler_Worker-2] 60401  DEBUG
(com.googlecode.jmxtrans.jobs.ServerJob:31) - + Started server job:
Server [host=ecokaf1, port=,
url=service:jmx:rmi:///jndi/rmi://ecokaf1:/jmxrmi,
cronExpression=null,
numQueryThreads=2]
[05 Nov 2013 00:50:48] [ServerScheduler_Worker-2] 60404  DEBUG
(com.googlecode.jmxtrans.util.JmxUtils:102) - - Creating 1 query
threads
[05 Nov 2013 00:50:48] [ServerScheduler_Worker-2] 60410  DEBUG
(com.googlecode.jmxtrans.jobs.ServerJob:50) - + Finished server job:
Server [host=ecokaf1, port=,
url=service:jmx:rmi:///jndi/rmi://ecokaf1:/jmxrmi,
cronExpression=null,
numQueryThreads=2]

Does any one spot an issue with the json?

When I add the jvmheapmemory and jvmGC examples to my json, jmxtrans sends
those metrics to Ganglia. Corresponding rrds files are created in ganglia
location and I can see the following in jmxtrans.log:
[05 Nov 2013 00:45:44] [pool-30-thread-1] 1740436 DEBUG
(com.googlecode.jmxtrans.model.output.GangliaWriter:141) - Sending Ganglia
metric heap.HeapMemoryUsage_init=263441792
[05 Nov 2013 00:45:44] [pool-30-thread-2] 1740449 DEBUG
(com.googlecode.jmxtrans.model.output.GangliaWriter:141) - Sending Ganglia
metric GC.PSScavenge.CollectionCount=334

And I know that kafka is publishing jmx metrics because I can see them
using jmxsh-R5 command line tool,

Appreciate any pointers you can provide with regards to kafka.
Thanks,
Priya



increasing number of replicas for an existing in topic

2013-10-02 Thread Paul Mackles
In 0.8, is there an undocumented way to increase the number of replicas for an 
existing topic? We created a number of topics with the wrong number of 
replicas. We could just delete and recreate but delete is flaky in 0.8. Was 
hoping someone figured out a way to do this w/out deleting the topics.

Thanks,
Paul



Re: full disk

2013-09-23 Thread Paul Mackles
Done:

https://issues.apache.org/jira/browse/KAFKA-1063

Out of curioisity, is manually removing the older log files the only option at 
this point?

From: Paul Mackles pmack...@adobe.commailto:pmack...@adobe.com
To: users@kafka.apache.orgmailto:users@kafka.apache.org 
users@kafka.apache.orgmailto:users@kafka.apache.org
Subject: full disk

Hi -

We ran into a situation on our dev cluster (3 nodes, v0.8) where we ran out of 
disk on one of the nodes . As expected, the broker shut itself down and all of 
the clients switched over to the other nodes. So far so good.

To free up disk space, I reduced log.retention.hours to something more 
manageable (from 172 to 12). I did this on all 3 nodes. Since the other 2 nodes 
were running OK, I first tried to restart the node which ran out of disk. 
Unfortunately, it kept shutting itself down due to the full disk. From the 
logs, I think this was because it was trying to sync-up the replicas it was 
responsible for and of course couldn't due to the lack of disk space. My hope 
was that upon restart, it would see the new retention settings and free up a 
bunch of disk space before trying to do any syncs.

I then went and restarted the other 2 nodes. They both picked up the new 
retention settings and freed up a bunch of storage as a result. I then went 
back and tried to restart the 3rd node but to no avail. It still had problems 
with the full disks.

I thought about trying to reassign partitions so that the node in question had 
less to manage but that turned out to be a hassle so I wound up manually 
deleting some of the old log/segment files. The broker seemed to come back fine 
after that but that's not something I would want to do on a production server.

We obviously need better monitoring/alerting to avoid this situation 
altogether, but I am wondering if the order of operations at startup 
could/should be changed to better account for scenarios like this. Or maybe a 
utility to remove old logs after changing ttl? Did I miss a better way to 
handle this?

Thanks,
Paul





full disk

2013-09-21 Thread Paul Mackles
Hi -

We ran into a situation on our dev cluster (3 nodes, v0.8) where we ran out of 
disk on one of the nodes . As expected, the broker shut itself down and all of 
the clients switched over to the other nodes. So far so good.

To free up disk space, I reduced log.retention.hours to something more 
manageable (from 172 to 12). I did this on all 3 nodes. Since the other 2 nodes 
were running OK, I first tried to restart the node which ran out of disk. 
Unfortunately, it kept shutting itself down due to the full disk. From the 
logs, I think this was because it was trying to sync-up the replicas it was 
responsible for and of course couldn't due to the lack of disk space. My hope 
was that upon restart, it would see the new retention settings and free up a 
bunch of disk space before trying to do any syncs.

I then went and restarted the other 2 nodes. They both picked up the new 
retention settings and freed up a bunch of storage as a result. I then went 
back and tried to restart the 3rd node but to no avail. It still had problems 
with the full disks.

I thought about trying to reassign partitions so that the node in question had 
less to manage but that turned out to be a hassle so I wound up manually 
deleting some of the old log/segment files. The broker seemed to come back fine 
after that but that's not something I would want to do on a production server.

We obviously need better monitoring/alerting to avoid this situation 
altogether, but I am wondering if the order of operations at startup 
could/should be changed to better account for scenarios like this. Or maybe a 
utility to remove old logs after changing ttl? Did I miss a better way to 
handle this?

Thanks,
Paul





Re: Unable to send and consume compressed events.

2013-08-29 Thread Paul Mackles
I assume this is kafka 0.8, right? Are there any corresponding errors in
the broker logs? With the configuration below, I don't think any errors
will be reported back to the producer.

You could also try setting erquest.required.acks=1 to see if errors are
reported back to the client.

On 8/29/13 4:40 AM, Lu Xuechao lux...@gmail.com wrote:

Hi ,

I am trying to enable gzip compression for my events. But after I switched
compression.codec to 1 I found the produced events were even not be
persisted to disk log file. Of course, the consumer could not receive any
compressed events. I sent 10,000 or more events but the broker's log file
not changed. Seems no events were actually send to broker? Below is my
producer's code:

Properties props = new Properties();
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(metadata.broker.list, 127.0.0.1:9092);
props.put(partitioner.class,
kafka.producer.DefaultPartitioner);
props.put(queue.enqueue.timeout.ms, -1);
props.put(request.required.acks, 0);
props.put(producer.type, async);

props.put(batch.num.messages, 100);

props.put(compression.codec, 1);

ProducerConfig config = new ProducerConfig(props);
producer = new ProducerString, String(config);

KeyedMessageString, String data = new KeyedMessageString,
String(topic1, messageStr, msg);
producer.send(data);


If I comment out this line of code : props.put(compression.codec, 1);
then everything works fine. Did I miss something?

thanks,
xlu



slf4j bindings

2013-08-19 Thread Paul Mackles
Hi – Has anyone figured out a clean way to ignore/exclude the simple slf4j 
bindings that get included in the kafka-assembly jar for 0.8? I would like all 
of the libaries in my app to log through log4j but for those libraries using 
slf4j, the simple bindings in the kafka-assembly jar are getting in the way. 
Ironically, kafka uses log4j directly so that's working fine ;)

Thanks,
Paul